mirror of
https://github.com/sartography/cr-connect-workflow.git
synced 2025-02-23 05:08:32 +00:00
The validation process needs to take the api model into account so we catch errors with bad file names.
422 lines
21 KiB
Python
422 lines
21 KiB
Python
import logging
|
|
import os
|
|
import json
|
|
import string
|
|
import random
|
|
from unittest.mock import patch
|
|
|
|
from SpiffWorkflow import Task as SpiffTask
|
|
from SpiffWorkflow.bpmn.specs.EndEvent import EndEvent
|
|
from SpiffWorkflow.camunda.specs.UserTask import Form, FormField
|
|
from SpiffWorkflow.specs import TaskSpec
|
|
|
|
from crc import session, db, app
|
|
from crc.api.common import ApiError
|
|
from crc.models.file import FileModel, FileDataModel, CONTENT_TYPES
|
|
from crc.models.study import StudyModel
|
|
from crc.models.workflow import WorkflowSpecModel, WorkflowStatus, WorkflowModel
|
|
from crc.services.file_service import FileService
|
|
from crc.services.study_service import StudyService
|
|
from crc.models.protocol_builder import ProtocolBuilderStudySchema, ProtocolBuilderInvestigatorSchema, \
|
|
ProtocolBuilderRequiredDocumentSchema
|
|
from crc.services.workflow_service import WorkflowService
|
|
from tests.base_test import BaseTest
|
|
from crc.services.workflow_processor import WorkflowProcessor
|
|
|
|
|
|
class TestWorkflowProcessor(BaseTest):
|
|
|
|
def _populate_form_with_random_data(self, task):
|
|
api_task = WorkflowService.spiff_task_to_api_task(task, add_docs_and_forms=True)
|
|
WorkflowProcessor.populate_form_with_random_data(task, api_task)
|
|
|
|
def get_processor(self, study_model, spec_model):
|
|
workflow_model = StudyService._create_workflow_model(study_model, spec_model)
|
|
return WorkflowProcessor(workflow_model)
|
|
|
|
def test_create_and_complete_workflow(self):
|
|
self.load_example_data()
|
|
workflow_spec_model = self.load_test_spec("random_fact")
|
|
study = session.query(StudyModel).first()
|
|
processor = self.get_processor(study, workflow_spec_model)
|
|
self.assertEqual(study.id, processor.bpmn_workflow.data[WorkflowProcessor.STUDY_ID_KEY])
|
|
self.assertIsNotNone(processor)
|
|
self.assertEqual(WorkflowStatus.user_input_required, processor.get_status())
|
|
next_user_tasks = processor.next_user_tasks()
|
|
self.assertEqual(1, len(next_user_tasks))
|
|
task = next_user_tasks[0]
|
|
self.assertEqual("Task_User_Select_Type", task.get_name())
|
|
model = {"type": "buzzword"}
|
|
if task.data is None:
|
|
task.data = {}
|
|
task.data.update(model)
|
|
processor.complete_task(task)
|
|
self.assertEqual(WorkflowStatus.waiting, processor.get_status())
|
|
processor.do_engine_steps()
|
|
self.assertEqual(WorkflowStatus.complete, processor.get_status())
|
|
data = processor.get_data()
|
|
self.assertIsNotNone(data)
|
|
self.assertIn("FactService", data)
|
|
|
|
def test_workflow_with_dmn(self):
|
|
self.load_example_data()
|
|
study = session.query(StudyModel).first()
|
|
workflow_spec_model = self.load_test_spec("decision_table")
|
|
files = session.query(FileModel).filter_by(workflow_spec_id='decision_table').all()
|
|
self.assertEqual(2, len(files))
|
|
processor = self.get_processor(study, workflow_spec_model)
|
|
self.assertEqual(WorkflowStatus.user_input_required, processor.get_status())
|
|
next_user_tasks = processor.next_user_tasks()
|
|
self.assertEqual(1, len(next_user_tasks))
|
|
task = next_user_tasks[0]
|
|
self.assertEqual("get_num_presents", task.get_name())
|
|
model = {"num_presents": 1}
|
|
if task.data is None:
|
|
task.data = {}
|
|
task.data.update(model)
|
|
processor.complete_task(task)
|
|
processor.do_engine_steps()
|
|
data = processor.get_data()
|
|
self.assertIsNotNone(data)
|
|
self.assertIn("message", data)
|
|
self.assertEqual("Oh, Ginger.", data.get('message'))
|
|
self.assertEqual("End", processor.bpmn_workflow.last_task.task_spec.name)
|
|
self.assertEqual("Oh, Ginger.", processor.bpmn_workflow.last_task.data.get('message'))
|
|
|
|
|
|
def test_workflow_with_parallel_forms(self):
|
|
self.load_example_data()
|
|
workflow_spec_model = self.load_test_spec("parallel_tasks")
|
|
study = session.query(StudyModel).first()
|
|
processor = self.get_processor(study, workflow_spec_model)
|
|
self.assertEqual(WorkflowStatus.user_input_required, processor.get_status())
|
|
|
|
# Complete the first steps of the 4 parallel tasks
|
|
next_user_tasks = processor.next_user_tasks()
|
|
self.assertEqual(4, len(next_user_tasks))
|
|
self._populate_form_with_random_data(next_user_tasks[0])
|
|
self._populate_form_with_random_data(next_user_tasks[1])
|
|
self._populate_form_with_random_data(next_user_tasks[2])
|
|
self._populate_form_with_random_data(next_user_tasks[3])
|
|
processor.complete_task(next_user_tasks[0])
|
|
processor.complete_task(next_user_tasks[1])
|
|
processor.complete_task(next_user_tasks[2])
|
|
processor.complete_task(next_user_tasks[3])
|
|
|
|
# There are another 4 tasks to complete (each parallel task has a follow-up task)
|
|
next_user_tasks = processor.next_user_tasks()
|
|
self.assertEqual(4, len(next_user_tasks))
|
|
self._populate_form_with_random_data(next_user_tasks[0])
|
|
self._populate_form_with_random_data(next_user_tasks[1])
|
|
self._populate_form_with_random_data(next_user_tasks[2])
|
|
self._populate_form_with_random_data(next_user_tasks[3])
|
|
processor.complete_task(next_user_tasks[0])
|
|
processor.complete_task(next_user_tasks[1])
|
|
processor.complete_task(next_user_tasks[2])
|
|
processor.complete_task(next_user_tasks[3])
|
|
processor.do_engine_steps()
|
|
|
|
# Should be one last step after the above are complete
|
|
final_user_tasks = processor.next_user_tasks()
|
|
self.assertEqual(1, len(final_user_tasks))
|
|
self._populate_form_with_random_data(final_user_tasks[0])
|
|
processor.complete_task(final_user_tasks[0])
|
|
|
|
processor.do_engine_steps()
|
|
self.assertTrue(processor.bpmn_workflow.is_completed())
|
|
|
|
def test_workflow_processor_knows_the_text_task_even_when_parallel(self):
|
|
self.load_example_data()
|
|
study = session.query(StudyModel).first()
|
|
workflow_spec_model = self.load_test_spec("parallel_tasks")
|
|
processor = self.get_processor(study, workflow_spec_model)
|
|
self.assertEqual(WorkflowStatus.user_input_required, processor.get_status())
|
|
next_user_tasks = processor.next_user_tasks()
|
|
self.assertEqual(4, len(next_user_tasks))
|
|
self.assertEqual(next_user_tasks[0], processor.next_task(), "First task in list of 4")
|
|
|
|
# Complete the third open task, so do things out of order
|
|
# this should cause the system to recommend the first ready task that is a
|
|
# child of the last completed task.
|
|
task = next_user_tasks[2]
|
|
self._populate_form_with_random_data(task)
|
|
processor.complete_task(task)
|
|
next_user_tasks = processor.next_user_tasks()
|
|
self.assertEqual(processor.bpmn_workflow.last_task, task)
|
|
self.assertEqual(4, len(next_user_tasks))
|
|
self.assertEqual(task.children[0], processor.next_task())
|
|
|
|
def test_workflow_processor_returns_next_task_as_end_task_if_complete(self):
|
|
self.load_example_data()
|
|
workflow_spec_model = self.load_test_spec("random_fact")
|
|
study = session.query(StudyModel).first()
|
|
processor = self.get_processor(study, workflow_spec_model)
|
|
processor.do_engine_steps()
|
|
task = processor.next_task()
|
|
task.data = {"type": "buzzword"}
|
|
processor.complete_task(task)
|
|
self.assertEqual(WorkflowStatus.waiting, processor.get_status())
|
|
processor.do_engine_steps()
|
|
self.assertEqual(WorkflowStatus.complete, processor.get_status())
|
|
task = processor.next_task()
|
|
self.assertIsNotNone(task)
|
|
self.assertIn("FactService", task.data)
|
|
self.assertIsInstance(task.task_spec, EndEvent)
|
|
|
|
def test_workflow_validation_error_is_properly_raised(self):
|
|
self.load_example_data()
|
|
workflow_spec_model = self.load_test_spec("invalid_spec")
|
|
study = session.query(StudyModel).first()
|
|
with self.assertRaises(ApiError) as context:
|
|
self.get_processor(study, workflow_spec_model)
|
|
self.assertEqual("workflow_validation_error", context.exception.code)
|
|
self.assertTrue("bpmn:startEvent" in context.exception.message)
|
|
|
|
def test_workflow_spec_key_error(self):
|
|
"""Frequently seeing errors in the logs about a 'Key' error, where a workflow
|
|
references something that doesn't exist in the midst of processing. Want to
|
|
make sure we produce errors to the front end that allows us to debug this."""
|
|
# Start the two_forms workflow, and enter some data in the first form.
|
|
self.load_example_data()
|
|
study = session.query(StudyModel).first()
|
|
workflow_spec_model = self.load_test_spec("two_forms")
|
|
processor = self.get_processor(study, workflow_spec_model)
|
|
self.assertEqual(processor.workflow_model.workflow_spec_id, workflow_spec_model.id)
|
|
task = processor.next_task()
|
|
task.data = {"color": "blue"}
|
|
processor.complete_task(task)
|
|
|
|
# Modify the specification, with a major change.
|
|
file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'two_forms', 'mods', 'two_forms_struc_mod.bpmn')
|
|
self.replace_file("two_forms.bpmn", file_path)
|
|
|
|
# Attemping a soft update on a structural change should raise a sensible error.
|
|
with self.assertRaises(ApiError) as context:
|
|
processor3 = WorkflowProcessor(processor.workflow_model, soft_reset=True)
|
|
self.assertEqual("unexpected_workflow_structure", context.exception.code)
|
|
|
|
def test_workflow_with_bad_expression_raises_sensible_error(self):
|
|
self.load_example_data()
|
|
|
|
workflow_spec_model = self.load_test_spec("invalid_expression")
|
|
study = session.query(StudyModel).first()
|
|
processor = self.get_processor(study, workflow_spec_model)
|
|
processor.do_engine_steps()
|
|
next_user_tasks = processor.next_user_tasks()
|
|
self.assertEqual(1, len(next_user_tasks))
|
|
self._populate_form_with_random_data(next_user_tasks[0])
|
|
processor.complete_task(next_user_tasks[0])
|
|
with self.assertRaises(ApiError) as context:
|
|
processor.do_engine_steps()
|
|
self.assertEqual("task_error", context.exception.code)
|
|
|
|
def test_workflow_with_docx_template(self):
|
|
self.load_example_data()
|
|
study = session.query(StudyModel).first()
|
|
workflow_spec_model = self.load_test_spec("docx")
|
|
files = session.query(FileModel).filter_by(workflow_spec_id='docx').all()
|
|
self.assertEqual(2, len(files))
|
|
workflow_spec_model = session.query(WorkflowSpecModel).filter_by(id="docx").first()
|
|
processor = self.get_processor(study, workflow_spec_model)
|
|
self.assertEqual(WorkflowStatus.user_input_required, processor.get_status())
|
|
next_user_tasks = processor.next_user_tasks()
|
|
self.assertEqual(1, len(next_user_tasks))
|
|
task = next_user_tasks[0]
|
|
self.assertEqual("task_gather_information", task.get_name())
|
|
self._populate_form_with_random_data(task)
|
|
processor.complete_task(task)
|
|
|
|
files = session.query(FileModel).filter_by(study_id=study.id, workflow_id=processor.get_workflow_id()).all()
|
|
self.assertEqual(0, len(files))
|
|
processor.do_engine_steps()
|
|
files = session.query(FileModel).filter_by(study_id=study.id, workflow_id=processor.get_workflow_id()).all()
|
|
self.assertEqual(1, len(files), "The task should create a new file.")
|
|
file_data = session.query(FileDataModel).filter(FileDataModel.file_model_id == files[0].id).first()
|
|
self.assertIsNotNone(file_data.data)
|
|
self.assertTrue(len(file_data.data) > 0)
|
|
# Not going any farther here, assuming this is tested in libraries correctly.
|
|
|
|
def test_load_study_information(self):
|
|
""" Test a workflow that includes requests to pull in Study Details."""
|
|
|
|
self.load_example_data()
|
|
study = session.query(StudyModel).first()
|
|
workflow_spec_model = self.load_test_spec("study_details")
|
|
processor = self.get_processor(study, workflow_spec_model)
|
|
processor.do_engine_steps()
|
|
task = processor.bpmn_workflow.last_task
|
|
self.assertIsNotNone(task.data)
|
|
self.assertIn("StudyInfo", task.data)
|
|
self.assertIn("info", task.data["StudyInfo"])
|
|
self.assertIn("title", task.data["StudyInfo"]["info"])
|
|
self.assertIn("last_updated", task.data["StudyInfo"]["info"])
|
|
self.assertIn("sponsor", task.data["StudyInfo"]["info"])
|
|
|
|
def test_spec_versioning(self):
|
|
self.load_example_data()
|
|
study = session.query(StudyModel).first()
|
|
workflow_spec_model = self.load_test_spec("decision_table")
|
|
processor = self.get_processor(study, workflow_spec_model)
|
|
self.assertTrue(processor.get_spec_version().startswith('v1.1'))
|
|
file_service = FileService()
|
|
|
|
file_service.add_workflow_spec_file(workflow_spec_model, "new_file.txt", "txt", b'blahblah')
|
|
processor = self.get_processor(study, workflow_spec_model)
|
|
self.assertTrue(processor.get_spec_version().startswith('v1.1.1'))
|
|
|
|
file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'docx', 'docx.bpmn')
|
|
file = open(file_path, "rb")
|
|
data = file.read()
|
|
|
|
file_model = db.session.query(FileModel).filter(FileModel.name == "decision_table.bpmn").first()
|
|
file_service.update_file(file_model, data, "txt")
|
|
processor = self.get_processor(study, workflow_spec_model)
|
|
self.assertTrue(processor.get_spec_version().startswith('v2.1.1'))
|
|
|
|
def test_restart_workflow(self):
|
|
self.load_example_data()
|
|
study = session.query(StudyModel).first()
|
|
workflow_spec_model = self.load_test_spec("two_forms")
|
|
processor = self.get_processor(study, workflow_spec_model)
|
|
self.assertEqual(processor.workflow_model.workflow_spec_id, workflow_spec_model.id)
|
|
task = processor.next_task()
|
|
task.data = {"key": "Value"}
|
|
processor.complete_task(task)
|
|
task_before_restart = processor.next_task()
|
|
processor.hard_reset()
|
|
task_after_restart = processor.next_task()
|
|
|
|
self.assertNotEqual(task.get_name(), task_before_restart.get_name())
|
|
self.assertEqual(task.get_name(), task_after_restart.get_name())
|
|
self.assertEqual(task.data, task_after_restart.data)
|
|
|
|
def test_soft_reset(self):
|
|
self.load_example_data()
|
|
|
|
# Start the two_forms workflow, and enter some data in the first form.
|
|
study = session.query(StudyModel).first()
|
|
workflow_spec_model = self.load_test_spec("two_forms")
|
|
processor = self.get_processor(study, workflow_spec_model)
|
|
self.assertEqual(processor.workflow_model.workflow_spec_id, workflow_spec_model.id)
|
|
task = processor.next_task()
|
|
task.data = {"color": "blue"}
|
|
processor.complete_task(task)
|
|
|
|
# Modify the specification, with a minor text change.
|
|
file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'two_forms', 'mods', 'two_forms_text_mod.bpmn')
|
|
self.replace_file("two_forms.bpmn", file_path)
|
|
|
|
# Setting up another processor should not error out, but doesn't pick up the update.
|
|
processor.workflow_model.bpmn_workflow_json = processor.serialize()
|
|
processor2 = WorkflowProcessor(processor.workflow_model)
|
|
self.assertEqual("Step 1", processor2.bpmn_workflow.last_task.task_spec.description)
|
|
self.assertNotEqual("# This is some documentation I wanted to add.",
|
|
processor2.bpmn_workflow.last_task.task_spec.documentation)
|
|
|
|
# You can do a soft update and get the right response.
|
|
processor3 = WorkflowProcessor(processor.workflow_model, soft_reset=True)
|
|
self.assertEqual("Step 1", processor3.bpmn_workflow.last_task.task_spec.description)
|
|
self.assertEqual("# This is some documentation I wanted to add.",
|
|
processor3.bpmn_workflow.last_task.task_spec.documentation)
|
|
|
|
|
|
|
|
def test_hard_reset(self):
|
|
self.load_example_data()
|
|
|
|
# Start the two_forms workflow, and enter some data in the first form.
|
|
study = session.query(StudyModel).first()
|
|
workflow_spec_model = self.load_test_spec("two_forms")
|
|
processor = self.get_processor(study, workflow_spec_model)
|
|
self.assertEqual(processor.workflow_model.workflow_spec_id, workflow_spec_model.id)
|
|
task = processor.next_task()
|
|
task.data = {"color": "blue"}
|
|
processor.complete_task(task)
|
|
next_task = processor.next_task()
|
|
self.assertEqual("Step 2", next_task.task_spec.description)
|
|
|
|
# Modify the specification, with a major change that alters the flow and can't be serialized effectively.
|
|
file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'two_forms', 'mods', 'two_forms_struc_mod.bpmn')
|
|
self.replace_file("two_forms.bpmn", file_path)
|
|
|
|
# Assure that creating a new processor doesn't cause any issues, and maintains the spec version.
|
|
processor.workflow_model.bpmn_workflow_json = processor.serialize()
|
|
processor2 = WorkflowProcessor(processor.workflow_model)
|
|
self.assertTrue(processor2.get_spec_version().startswith("v1 ")) # Still at version 1.
|
|
|
|
# Do a hard reset, which should bring us back to the beginning, but retain the data.
|
|
processor3 = WorkflowProcessor(processor.workflow_model, hard_reset=True)
|
|
self.assertEqual("Step 1", processor3.next_task().task_spec.description)
|
|
self.assertEqual({"color": "blue"}, processor3.next_task().data)
|
|
processor3.complete_task(processor3.next_task())
|
|
self.assertEqual("New Step", processor3.next_task().task_spec.description)
|
|
self.assertEqual("blue", processor3.next_task().data["color"])
|
|
|
|
def test_get_latest_spec_version(self):
|
|
workflow_spec_model = self.load_test_spec("two_forms")
|
|
version = WorkflowProcessor.get_latest_version_string("two_forms")
|
|
self.assertTrue(version.startswith("v1 "))
|
|
|
|
@patch('crc.services.protocol_builder.ProtocolBuilderService.get_studies')
|
|
@patch('crc.services.protocol_builder.ProtocolBuilderService.get_investigators')
|
|
@patch('crc.services.protocol_builder.ProtocolBuilderService.get_required_docs')
|
|
@patch('crc.services.protocol_builder.ProtocolBuilderService.get_study_details')
|
|
def test_master_bpmn(self, mock_details, mock_required_docs, mock_investigators, mock_studies):
|
|
|
|
# Mock Protocol Builder response
|
|
studies_response = self.protocol_builder_response('user_studies.json')
|
|
mock_studies.return_value = ProtocolBuilderStudySchema(many=True).loads(studies_response)
|
|
|
|
investigators_response = self.protocol_builder_response('investigators.json')
|
|
mock_investigators.return_value = json.loads(investigators_response)
|
|
|
|
required_docs_response = self.protocol_builder_response('required_docs.json')
|
|
mock_required_docs.return_value = json.loads(required_docs_response)
|
|
|
|
details_response = self.protocol_builder_response('study_details.json')
|
|
mock_details.return_value = json.loads(details_response)
|
|
|
|
self.load_example_data()
|
|
|
|
study = session.query(StudyModel).first()
|
|
workflow_spec_model = db.session.query(WorkflowSpecModel).\
|
|
filter(WorkflowSpecModel.name == "top_level_workflow").first()
|
|
self.assertIsNotNone(workflow_spec_model)
|
|
|
|
processor = self.get_processor(study, workflow_spec_model)
|
|
processor.do_engine_steps()
|
|
self.assertTrue("Top level process is fully automatic.", processor.bpmn_workflow.is_completed())
|
|
data = processor.bpmn_workflow.last_task.data
|
|
|
|
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG)
|
|
|
|
# It should mark Enter Core Data as required, because it is always required.
|
|
self.assertTrue("enter_core_info" in data)
|
|
self.assertEqual("required", data["enter_core_info"])
|
|
|
|
# It should mark Personnel as required, because StudyInfo.investigators is not empty.
|
|
self.assertTrue("personnel" in data)
|
|
self.assertEqual("required", data["personnel"])
|
|
|
|
# It should mark the sponsor funding source as disabled since the funding required (12) is not included in the required docs.
|
|
self.assertTrue("sponsor_funding_source" in data)
|
|
self.assertEqual("disabled", data["sponsor_funding_source"])
|
|
|
|
def test_enum_with_no_choices_raises_api_error(self):
|
|
self.load_example_data()
|
|
workflow_spec_model = self.load_test_spec("random_fact")
|
|
study = session.query(StudyModel).first()
|
|
processor = self.get_processor(study, workflow_spec_model)
|
|
processor.do_engine_steps()
|
|
tasks = processor.next_user_tasks()
|
|
task = tasks[0]
|
|
|
|
|
|
field = FormField()
|
|
field.id = "test_enum_field"
|
|
field.type = "enum"
|
|
field.options = []
|
|
task.task_spec.form.fields.append(field)
|
|
|
|
with self.assertRaises(ApiError):
|
|
self._populate_form_with_random_data(task) |