import re from SpiffWorkflow.serializer.exceptions import MissingSpecError from lxml import etree import shlex from datetime import datetime from typing import List from SpiffWorkflow import Task as SpiffTask, WorkflowException from SpiffWorkflow.bpmn.BpmnScriptEngine import BpmnScriptEngine from SpiffWorkflow.bpmn.parser.ValidationException import ValidationException from SpiffWorkflow.bpmn.serializer.BpmnSerializer import BpmnSerializer from SpiffWorkflow.bpmn.specs.EndEvent import EndEvent from SpiffWorkflow.bpmn.workflow import BpmnWorkflow from SpiffWorkflow.camunda.parser.CamundaParser import CamundaParser from SpiffWorkflow.dmn.parser.BpmnDmnParser import BpmnDmnParser from SpiffWorkflow.exceptions import WorkflowTaskExecException from SpiffWorkflow.specs import WorkflowSpec import crc from crc import session, app from crc.api.common import ApiError from crc.models.file import FileDataModel, FileModel, FileType from crc.models.workflow import WorkflowStatus, WorkflowModel, WorkflowSpecDependencyFile from crc.scripts.script import Script from crc.services.file_service import FileService from crc import app class CustomBpmnScriptEngine(BpmnScriptEngine): """This is a custom script processor that can be easily injected into Spiff Workflow. Rather than execute arbitrary code, this assumes the script references a fully qualified python class such as myapp.RandomFact. """ def execute(self, task: SpiffTask, script, data): """ Functions in two modes. 1. If the command is proceeded by #! then this is assumed to be a python script, and will attempt to load that python module and execute the do_task method on that script. Scripts must be located in the scripts package and they must extend the script.py class. 2. If not proceeded by the #! this will attempt to execute the script directly and assumes it is valid Python. """ # Shlex splits the whole string while respecting double quoted strings within study_id = task.workflow.data[WorkflowProcessor.STUDY_ID_KEY] if WorkflowProcessor.WORKFLOW_ID_KEY in task.workflow.data: workflow_id = task.workflow.data[WorkflowProcessor.WORKFLOW_ID_KEY] else: workflow_id = None if not script.startswith('#!'): try: augmentMethods = {'studyInfo':lambda *args: crc.scripts.study_info.StudyInfo.return_data( crc.scripts.study_info.StudyInfo(), task, study_id, workflow_id, *args)} super().execute(task, script, data,externalMethods=augmentMethods) except SyntaxError as e: raise ApiError.from_task('syntax_error', f'If you are running a pre-defined script, please' f' proceed the script with "#!", otherwise this is assumed to be' f' pure python: {script}, {e.msg}', task=task) else: self.run_predefined_script(task, script[2:], data) # strip off the first two characters. def run_predefined_script(self, task: SpiffTask, script, data): commands = shlex.split(script) path_and_command = commands[0].rsplit(".", 1) if len(path_and_command) == 1: module_name = "crc.scripts." + self.camel_to_snake(path_and_command[0]) class_name = path_and_command[0] else: module_name = "crc.scripts." + path_and_command[0] + "." + self.camel_to_snake(path_and_command[1]) class_name = path_and_command[1] try: mod = __import__(module_name, fromlist=[class_name]) klass = getattr(mod, class_name) study_id = task.workflow.data[WorkflowProcessor.STUDY_ID_KEY] if WorkflowProcessor.WORKFLOW_ID_KEY in task.workflow.data: workflow_id = task.workflow.data[WorkflowProcessor.WORKFLOW_ID_KEY] else: workflow_id = None if not isinstance(klass(), Script): raise ApiError.from_task("invalid_script", "This is an internal error. The script '%s:%s' you called " % (module_name, class_name) + "does not properly implement the CRC Script class.", task=task) if task.workflow.data[WorkflowProcessor.VALIDATION_PROCESS_KEY]: """If this is running a validation, and not a normal process, then we want to mimic running the script, but not make any external calls or database changes.""" klass().do_task_validate_only(task, study_id, workflow_id, *commands[1:]) else: klass().do_task(task, study_id, workflow_id, *commands[1:]) except ModuleNotFoundError: raise ApiError.from_task("invalid_script", "Unable to locate Script: '%s:%s'" % (module_name, class_name), task=task) def evaluate_expression(self, task, expression): """ Evaluate the given expression, within the context of the given task and return the result. """ exp, valid = self.validateExpression(expression) return self._eval(exp, **task.data) @staticmethod def camel_to_snake(camel): camel = camel.strip() return re.sub(r'(? 0: return WorkflowStatus.user_input_required else: return WorkflowStatus.waiting def hard_reset(self): """Recreate this workflow. This will be useful when a workflow specification changes. """ self.spec_data_files = FileService.get_spec_data_files(workflow_spec_id=self.workflow_spec_id) new_spec = WorkflowProcessor.get_spec(self.spec_data_files, self.workflow_spec_id) new_bpmn_workflow = BpmnWorkflow(new_spec, script_engine=self._script_engine) new_bpmn_workflow.data = self.bpmn_workflow.data try: new_bpmn_workflow.do_engine_steps() except WorkflowException as we: raise ApiError.from_task_spec("hard_reset_engine_steps_error", str(we), we.sender) self.bpmn_workflow = new_bpmn_workflow def get_status(self): return self.status_of(self.bpmn_workflow) def do_engine_steps(self): try: self.bpmn_workflow.do_engine_steps() except WorkflowTaskExecException as we: raise ApiError.from_task("task_error", str(we), we.task) def serialize(self): return self._serializer.serialize_workflow(self.bpmn_workflow) def next_user_tasks(self): return self.bpmn_workflow.get_ready_user_tasks() def next_task(self): """Returns the next task that should be completed even if there are parallel tasks and multiple options are available. If the workflow is complete it will return the final end task. """ # If the whole blessed mess is done, return the end_event task in the tree if self.bpmn_workflow.is_completed(): for task in SpiffTask.Iterator(self.bpmn_workflow.task_tree, SpiffTask.ANY_MASK): if isinstance(task.task_spec, EndEvent): return task # If there are ready tasks to complete, return the next ready task, but return the one # in the active parallel path if possible. ready_tasks = self.bpmn_workflow.get_tasks(SpiffTask.READY) if len(ready_tasks) > 0: for task in ready_tasks: if task.parent == self.bpmn_workflow.last_task: return task return ready_tasks[0] # If there are no ready tasks, but the thing isn't complete yet, find the first non-complete task # and return that next_task = None for task in SpiffTask.Iterator(self.bpmn_workflow.task_tree, SpiffTask.NOT_FINISHED_MASK): next_task = task return next_task def previous_task(self): return None def complete_task(self, task): self.bpmn_workflow.complete_task_from_id(task.id) def get_data(self): return self.bpmn_workflow.data def get_workflow_id(self): return self.workflow_model.id def get_study_id(self): return self.bpmn_workflow.data[self.STUDY_ID_KEY] def get_ready_user_tasks(self): return self.bpmn_workflow.get_ready_user_tasks() def get_current_user_tasks(self): """Return a list of all user tasks that are READY or COMPLETE and are parallel to the READY Task.""" ready_tasks = self.bpmn_workflow.get_ready_user_tasks() additional_tasks = [] if len(ready_tasks) > 0: for child in ready_tasks[0].parent.children: if child.state == SpiffTask.COMPLETED: additional_tasks.append(child) return ready_tasks + additional_tasks def get_all_user_tasks(self): all_tasks = self.bpmn_workflow.get_tasks(SpiffTask.ANY_MASK) return [t for t in all_tasks if not self.bpmn_workflow._is_engine_task(t.task_spec)] def get_all_completed_tasks(self): all_tasks = self.bpmn_workflow.get_tasks(SpiffTask.ANY_MASK) return [t for t in all_tasks if not self.bpmn_workflow._is_engine_task(t.task_spec) and t.state in [t.COMPLETED, t.CANCELLED]] def get_nav_item(self, task): for nav_item in self.bpmn_workflow.get_nav_list(): if nav_item['task_id'] == task.id: return nav_item def find_task_and_field_by_field_id(self, field_id): """Tracks down a form field by name in the workflow spec, only looks at ready tasks. Returns a tuple of the task, and form""" for spiff_task in self.bpmn_workflow.get_tasks(): if hasattr(spiff_task.task_spec, "form"): for field in spiff_task.task_spec.form.fields: if field.id == field_id: return spiff_task, field raise ApiError("invalid_field", "Unable to find a task in the workflow with a lookup field called: %s" % field_id)