import logging import os import json import string import random from unittest.mock import patch from SpiffWorkflow import Task as SpiffTask from SpiffWorkflow.bpmn.specs.EndEvent import EndEvent from SpiffWorkflow.camunda.specs.UserTask import Form, FormField from SpiffWorkflow.specs import TaskSpec from crc import session, db, app from crc.api.common import ApiError from crc.models.file import FileModel, FileDataModel, CONTENT_TYPES from crc.models.study import StudyModel from crc.models.workflow import WorkflowSpecModel, WorkflowStatus, WorkflowModel from crc.services.file_service import FileService from crc.services.study_service import StudyService from crc.models.protocol_builder import ProtocolBuilderStudySchema, ProtocolBuilderInvestigatorSchema, \ ProtocolBuilderRequiredDocumentSchema from tests.base_test import BaseTest from crc.services.workflow_processor import WorkflowProcessor class TestWorkflowProcessor(BaseTest): def _populate_form_with_random_data(self, task): WorkflowProcessor.populate_form_with_random_data(task) def get_processor(self, study_model, spec_model): workflow_model = StudyService._create_workflow_model(study_model, spec_model) return WorkflowProcessor(workflow_model) def test_create_and_complete_workflow(self): self.load_example_data() workflow_spec_model = self.load_test_spec("random_fact") study = session.query(StudyModel).first() processor = self.get_processor(study, workflow_spec_model) self.assertEqual(study.id, processor.bpmn_workflow.data[WorkflowProcessor.STUDY_ID_KEY]) self.assertIsNotNone(processor) self.assertEqual(WorkflowStatus.user_input_required, processor.get_status()) next_user_tasks = processor.next_user_tasks() self.assertEqual(1, len(next_user_tasks)) task = next_user_tasks[0] self.assertEqual("Task_User_Select_Type", task.get_name()) model = {"type": "buzzword"} if task.data is None: task.data = {} task.data.update(model) processor.complete_task(task) self.assertEqual(WorkflowStatus.waiting, processor.get_status()) processor.do_engine_steps() self.assertEqual(WorkflowStatus.complete, processor.get_status()) data = processor.get_data() self.assertIsNotNone(data) self.assertIn("FactService", data) def test_workflow_with_dmn(self): self.load_example_data() study = session.query(StudyModel).first() workflow_spec_model = self.load_test_spec("decision_table") files = session.query(FileModel).filter_by(workflow_spec_id='decision_table').all() self.assertEqual(2, len(files)) processor = self.get_processor(study, workflow_spec_model) self.assertEqual(WorkflowStatus.user_input_required, processor.get_status()) next_user_tasks = processor.next_user_tasks() self.assertEqual(1, len(next_user_tasks)) task = next_user_tasks[0] self.assertEqual("get_num_presents", task.get_name()) model = {"num_presents": 1} if task.data is None: task.data = {} task.data.update(model) processor.complete_task(task) processor.do_engine_steps() data = processor.get_data() self.assertIsNotNone(data) self.assertIn("message", data) self.assertEqual("Oh, Ginger.", data.get('message')) self.assertEqual("End", processor.bpmn_workflow.last_task.task_spec.name) self.assertEqual("Oh, Ginger.", processor.bpmn_workflow.last_task.data.get('message')) def test_workflow_with_parallel_forms(self): self.load_example_data() workflow_spec_model = self.load_test_spec("parallel_tasks") study = session.query(StudyModel).first() processor = self.get_processor(study, workflow_spec_model) self.assertEqual(WorkflowStatus.user_input_required, processor.get_status()) # Complete the first steps of the 4 parallel tasks next_user_tasks = processor.next_user_tasks() self.assertEqual(4, len(next_user_tasks)) self._populate_form_with_random_data(next_user_tasks[0]) self._populate_form_with_random_data(next_user_tasks[1]) self._populate_form_with_random_data(next_user_tasks[2]) self._populate_form_with_random_data(next_user_tasks[3]) processor.complete_task(next_user_tasks[0]) processor.complete_task(next_user_tasks[1]) processor.complete_task(next_user_tasks[2]) processor.complete_task(next_user_tasks[3]) # There are another 4 tasks to complete (each parallel task has a follow-up task) next_user_tasks = processor.next_user_tasks() self.assertEqual(4, len(next_user_tasks)) self._populate_form_with_random_data(next_user_tasks[0]) self._populate_form_with_random_data(next_user_tasks[1]) self._populate_form_with_random_data(next_user_tasks[2]) self._populate_form_with_random_data(next_user_tasks[3]) processor.complete_task(next_user_tasks[0]) processor.complete_task(next_user_tasks[1]) processor.complete_task(next_user_tasks[2]) processor.complete_task(next_user_tasks[3]) processor.do_engine_steps() # Should be one last step after the above are complete final_user_tasks = processor.next_user_tasks() self.assertEqual(1, len(final_user_tasks)) self._populate_form_with_random_data(final_user_tasks[0]) processor.complete_task(final_user_tasks[0]) processor.do_engine_steps() self.assertTrue(processor.bpmn_workflow.is_completed()) def test_workflow_processor_knows_the_text_task_even_when_parallel(self): self.load_example_data() study = session.query(StudyModel).first() workflow_spec_model = self.load_test_spec("parallel_tasks") processor = self.get_processor(study, workflow_spec_model) self.assertEqual(WorkflowStatus.user_input_required, processor.get_status()) next_user_tasks = processor.next_user_tasks() self.assertEqual(4, len(next_user_tasks)) self.assertEqual(next_user_tasks[0], processor.next_task(), "First task in list of 4") # Complete the third open task, so do things out of order # this should cause the system to recommend the first ready task that is a # child of the last completed task. task = next_user_tasks[2] self._populate_form_with_random_data(task) processor.complete_task(task) next_user_tasks = processor.next_user_tasks() self.assertEqual(processor.bpmn_workflow.last_task, task) self.assertEqual(4, len(next_user_tasks)) self.assertEqual(task.children[0], processor.next_task()) def test_workflow_processor_returns_next_task_as_end_task_if_complete(self): self.load_example_data() workflow_spec_model = self.load_test_spec("random_fact") study = session.query(StudyModel).first() processor = self.get_processor(study, workflow_spec_model) processor.do_engine_steps() task = processor.next_task() task.data = {"type": "buzzword"} processor.complete_task(task) self.assertEqual(WorkflowStatus.waiting, processor.get_status()) processor.do_engine_steps() self.assertEqual(WorkflowStatus.complete, processor.get_status()) task = processor.next_task() self.assertIsNotNone(task) self.assertIn("FactService", task.data) self.assertIsInstance(task.task_spec, EndEvent) def test_workflow_validation_error_is_properly_raised(self): self.load_example_data() workflow_spec_model = self.load_test_spec("invalid_spec") study = session.query(StudyModel).first() with self.assertRaises(ApiError) as context: self.get_processor(study, workflow_spec_model) self.assertEqual("workflow_validation_error", context.exception.code) self.assertTrue("bpmn:startEvent" in context.exception.message) def test_workflow_spec_key_error(self): """Frequently seeing errors in the logs about a 'Key' error, where a workflow references something that doesn't exist in the midst of processing. Want to make sure we produce errors to the front end that allows us to debug this.""" # Start the two_forms workflow, and enter some data in the first form. self.load_example_data() study = session.query(StudyModel).first() workflow_spec_model = self.load_test_spec("two_forms") processor = self.get_processor(study, workflow_spec_model) self.assertEqual(processor.workflow_model.workflow_spec_id, workflow_spec_model.id) task = processor.next_task() task.data = {"color": "blue"} processor.complete_task(task) # Modify the specification, with a major change. file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'two_forms', 'mods', 'two_forms_struc_mod.bpmn') self.replace_file("two_forms.bpmn", file_path) # Attemping a soft update on a structural change should raise a sensible error. with self.assertRaises(ApiError) as context: processor3 = WorkflowProcessor(processor.workflow_model, soft_reset=True) self.assertEqual("unexpected_workflow_structure", context.exception.code) def test_workflow_with_bad_expression_raises_sensible_error(self): self.load_example_data() workflow_spec_model = self.load_test_spec("invalid_expression") study = session.query(StudyModel).first() processor = self.get_processor(study, workflow_spec_model) processor.do_engine_steps() next_user_tasks = processor.next_user_tasks() self.assertEqual(1, len(next_user_tasks)) self._populate_form_with_random_data(next_user_tasks[0]) processor.complete_task(next_user_tasks[0]) with self.assertRaises(ApiError) as context: processor.do_engine_steps() self.assertEqual("invalid_expression", context.exception.code) def test_workflow_with_docx_template(self): self.load_example_data() study = session.query(StudyModel).first() workflow_spec_model = self.load_test_spec("docx") files = session.query(FileModel).filter_by(workflow_spec_id='docx').all() self.assertEqual(2, len(files)) workflow_spec_model = session.query(WorkflowSpecModel).filter_by(id="docx").first() processor = self.get_processor(study, workflow_spec_model) self.assertEqual(WorkflowStatus.user_input_required, processor.get_status()) next_user_tasks = processor.next_user_tasks() self.assertEqual(1, len(next_user_tasks)) task = next_user_tasks[0] self.assertEqual("task_gather_information", task.get_name()) self._populate_form_with_random_data(task) processor.complete_task(task) files = session.query(FileModel).filter_by(study_id=study.id, workflow_id=processor.get_workflow_id()).all() self.assertEqual(0, len(files)) processor.do_engine_steps() files = session.query(FileModel).filter_by(study_id=study.id, workflow_id=processor.get_workflow_id()).all() self.assertEqual(1, len(files), "The task should create a new file.") file_data = session.query(FileDataModel).filter(FileDataModel.file_model_id == files[0].id).first() self.assertIsNotNone(file_data.data) self.assertTrue(len(file_data.data) > 0) # Not going any farther here, assuming this is tested in libraries correctly. def test_load_study_information(self): """ Test a workflow that includes requests to pull in Study Details.""" self.load_example_data() study = session.query(StudyModel).first() workflow_spec_model = self.load_test_spec("study_details") processor = self.get_processor(study, workflow_spec_model) processor.do_engine_steps() task = processor.bpmn_workflow.last_task self.assertIsNotNone(task.data) self.assertIn("StudyInfo", task.data) self.assertIn("info", task.data["StudyInfo"]) self.assertIn("title", task.data["StudyInfo"]["info"]) self.assertIn("last_updated", task.data["StudyInfo"]["info"]) self.assertIn("sponsor", task.data["StudyInfo"]["info"]) def test_spec_versioning(self): self.load_example_data() study = session.query(StudyModel).first() workflow_spec_model = self.load_test_spec("decision_table") processor = self.get_processor(study, workflow_spec_model) self.assertTrue(processor.get_spec_version().startswith('v1.1')) file_service = FileService() file_service.add_workflow_spec_file(workflow_spec_model, "new_file.txt", "txt", b'blahblah') processor = self.get_processor(study, workflow_spec_model) self.assertTrue(processor.get_spec_version().startswith('v1.1.1')) file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'docx', 'docx.bpmn') file = open(file_path, "rb") data = file.read() file_model = db.session.query(FileModel).filter(FileModel.name == "decision_table.bpmn").first() file_service.update_file(file_model, data, "txt") processor = self.get_processor(study, workflow_spec_model) self.assertTrue(processor.get_spec_version().startswith('v2.1.1')) def test_restart_workflow(self): self.load_example_data() study = session.query(StudyModel).first() workflow_spec_model = self.load_test_spec("two_forms") processor = self.get_processor(study, workflow_spec_model) self.assertEqual(processor.workflow_model.workflow_spec_id, workflow_spec_model.id) task = processor.next_task() task.data = {"key": "Value"} processor.complete_task(task) task_before_restart = processor.next_task() processor.hard_reset() task_after_restart = processor.next_task() self.assertNotEqual(task.get_name(), task_before_restart.get_name()) self.assertEqual(task.get_name(), task_after_restart.get_name()) self.assertEqual(task.data, task_after_restart.data) def test_soft_reset(self): self.load_example_data() # Start the two_forms workflow, and enter some data in the first form. study = session.query(StudyModel).first() workflow_spec_model = self.load_test_spec("two_forms") processor = self.get_processor(study, workflow_spec_model) self.assertEqual(processor.workflow_model.workflow_spec_id, workflow_spec_model.id) task = processor.next_task() task.data = {"color": "blue"} processor.complete_task(task) # Modify the specification, with a minor text change. file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'two_forms', 'mods', 'two_forms_text_mod.bpmn') self.replace_file("two_forms.bpmn", file_path) # Setting up another processor should not error out, but doesn't pick up the update. processor.workflow_model.bpmn_workflow_json = processor.serialize() processor2 = WorkflowProcessor(processor.workflow_model) self.assertEqual("Step 1", processor2.bpmn_workflow.last_task.task_spec.description) self.assertNotEqual("# This is some documentation I wanted to add.", processor2.bpmn_workflow.last_task.task_spec.documentation) # You can do a soft update and get the right response. processor3 = WorkflowProcessor(processor.workflow_model, soft_reset=True) self.assertEqual("Step 1", processor3.bpmn_workflow.last_task.task_spec.description) self.assertEqual("# This is some documentation I wanted to add.", processor3.bpmn_workflow.last_task.task_spec.documentation) def test_hard_reset(self): self.load_example_data() # Start the two_forms workflow, and enter some data in the first form. study = session.query(StudyModel).first() workflow_spec_model = self.load_test_spec("two_forms") processor = self.get_processor(study, workflow_spec_model) self.assertEqual(processor.workflow_model.workflow_spec_id, workflow_spec_model.id) task = processor.next_task() task.data = {"color": "blue"} processor.complete_task(task) next_task = processor.next_task() self.assertEqual("Step 2", next_task.task_spec.description) # Modify the specification, with a major change that alters the flow and can't be serialized effectively. file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'two_forms', 'mods', 'two_forms_struc_mod.bpmn') self.replace_file("two_forms.bpmn", file_path) # Assure that creating a new processor doesn't cause any issues, and maintains the spec version. processor.workflow_model.bpmn_workflow_json = processor.serialize() processor2 = WorkflowProcessor(processor.workflow_model) self.assertTrue(processor2.get_spec_version().startswith("v1 ")) # Still at version 1. # Do a hard reset, which should bring us back to the beginning, but retain the data. processor3 = WorkflowProcessor(processor.workflow_model, hard_reset=True) self.assertEqual("Step 1", processor3.next_task().task_spec.description) self.assertEqual({"color": "blue"}, processor3.next_task().data) processor3.complete_task(processor3.next_task()) self.assertEqual("New Step", processor3.next_task().task_spec.description) self.assertEqual("blue", processor3.next_task().data["color"]) def test_get_latest_spec_version(self): workflow_spec_model = self.load_test_spec("two_forms") version = WorkflowProcessor.get_latest_version_string("two_forms") self.assertTrue(version.startswith("v1 ")) @patch('crc.services.protocol_builder.ProtocolBuilderService.get_studies') @patch('crc.services.protocol_builder.ProtocolBuilderService.get_investigators') @patch('crc.services.protocol_builder.ProtocolBuilderService.get_required_docs') @patch('crc.services.protocol_builder.ProtocolBuilderService.get_study_details') def test_master_bpmn(self, mock_details, mock_required_docs, mock_investigators, mock_studies): # Mock Protocol Builder response studies_response = self.protocol_builder_response('user_studies.json') mock_studies.return_value = ProtocolBuilderStudySchema(many=True).loads(studies_response) investigators_response = self.protocol_builder_response('investigators.json') mock_investigators.return_value = json.loads(investigators_response) required_docs_response = self.protocol_builder_response('required_docs.json') mock_required_docs.return_value = json.loads(required_docs_response) details_response = self.protocol_builder_response('study_details.json') mock_details.return_value = json.loads(details_response) self.load_example_data() study = session.query(StudyModel).first() workflow_spec_model = db.session.query(WorkflowSpecModel).\ filter(WorkflowSpecModel.name == "top_level_workflow").first() self.assertIsNotNone(workflow_spec_model) processor = self.get_processor(study, workflow_spec_model) processor.do_engine_steps() self.assertTrue("Top level process is fully automatic.", processor.bpmn_workflow.is_completed()) data = processor.bpmn_workflow.last_task.data logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG) # It should mark Enter Core Data as required, because it is always required. self.assertTrue("enter_core_info" in data) self.assertEqual("required", data["enter_core_info"]) # It should mark Personnel as required, because StudyInfo.investigators is not empty. self.assertTrue("personnel" in data) self.assertEqual("required", data["personnel"]) # It should mark the sponsor funding source as disabled since the funding required (12) is not included in the required docs. self.assertTrue("sponsor_funding_source" in data) self.assertEqual("disabled", data["sponsor_funding_source"]) def test_enum_with_no_choices_raises_api_error(self): self.load_example_data() workflow_spec_model = self.load_test_spec("random_fact") study = session.query(StudyModel).first() processor = self.get_processor(study, workflow_spec_model) processor.do_engine_steps() tasks = processor.next_user_tasks() task = tasks[0] field = FormField() field.id = "test_enum_field" field.type = "enum" field.options = [] task.task_spec.form.fields.append(field) with self.assertRaises(ApiError): processor.populate_form_with_random_data(task)