diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index eeee7dc6..cdfa44ce 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1047,6 +1047,7 @@ class ProcessInstanceProcessor: bpmn_process_dict: dict, bpmn_process_parent: Optional[BpmnProcessModel] = None, bpmn_process_guid: Optional[str] = None, + add_tasks_if_new_bpmn_process: bool = True, ) -> BpmnProcessModel: tasks = bpmn_process_dict.pop("tasks") bpmn_process_data = bpmn_process_dict.pop("data") @@ -1059,7 +1060,9 @@ class ProcessInstanceProcessor: elif self.process_instance_model.bpmn_process_id is not None: bpmn_process = self.process_instance_model.bpmn_process + bpmn_process_is_new = False if bpmn_process is None: + bpmn_process_is_new = True bpmn_process = BpmnProcessModel(guid=bpmn_process_guid) bpmn_process.properties_json = bpmn_process_dict @@ -1087,25 +1090,27 @@ class ProcessInstanceProcessor: bpmn_process.parent_process_id = bpmn_process_parent.id db.session.add(bpmn_process) - for task_id, task_properties in tasks.items(): - task_data_dict = task_properties.pop("data") - state_int = task_properties["state"] + if bpmn_process_is_new and add_tasks_if_new_bpmn_process: + # if True: + for task_id, task_properties in tasks.items(): + task_data_dict = task_properties.pop("data") + state_int = task_properties["state"] - task_model = TaskModel.query.filter_by(guid=task_id).first() - if task_model is None: - # bpmn_process_identifier = task_properties['workflow_name'] - # bpmn_identifier = task_properties['task_spec'] - # - # task_definition = TaskDefinitionModel.query.filter_by(bpmn_identifier=bpmn_identifier) - # .join(BpmnProcessDefinitionModel).filter(BpmnProcessDefinitionModel.bpmn_identifier==bpmn_process_identifier).first() - # if task_definition is None: - # subprocess_task = TaskModel.query.filter_by(guid=bpmn_process.guid) - task_model = TaskModel(guid=task_id, bpmn_process_id=bpmn_process.id) - task_model.state = TaskStateNames[state_int] - task_model.properties_json = task_properties + task_model = TaskModel.query.filter_by(guid=task_id).first() + if task_model is None: + # bpmn_process_identifier = task_properties['workflow_name'] + # bpmn_identifier = task_properties['task_spec'] + # + # task_definition = TaskDefinitionModel.query.filter_by(bpmn_identifier=bpmn_identifier) + # .join(BpmnProcessDefinitionModel).filter(BpmnProcessDefinitionModel.bpmn_identifier==bpmn_process_identifier).first() + # if task_definition is None: + # subprocess_task = TaskModel.query.filter_by(guid=bpmn_process.guid) + task_model = TaskModel(guid=task_id, bpmn_process_id=bpmn_process.id) + task_model.state = TaskStateNames[state_int] + task_model.properties_json = task_properties - TaskService.update_task_data_on_task_model(task_model, task_data_dict) - db.session.add(task_model) + TaskService.update_task_data_on_task_model(task_model, task_data_dict) + db.session.add(task_model) return bpmn_process @@ -1115,7 +1120,6 @@ class ProcessInstanceProcessor: Expects the save method to commit it. """ bpmn_dict = json.loads(self.serialize()) - # with open('tmp2.json', 'w') as f: f.write(json.dumps(bpmn_dict) bpmn_dict_keys = ("spec", "subprocess_specs", "serializer_version") process_instance_data_dict = {} bpmn_spec_dict = {} @@ -1132,15 +1136,18 @@ class ProcessInstanceProcessor: # FIXME: Update tasks in the did_complete_task instead to set the final info. # We will need to somehow cache all tasks initially though before each task is run. # Maybe always do this for first run - just need to know it's the first run. - if self.process_instance_model.bpmn_process_id is None: - subprocesses = process_instance_data_dict.pop("subprocesses") - bpmn_process_parent = self._add_bpmn_process(process_instance_data_dict) - for subprocess_task_id, subprocess_properties in subprocesses.items(): - self._add_bpmn_process( - subprocess_properties, - bpmn_process_parent, - bpmn_process_guid=subprocess_task_id, - ) + # import pdb; pdb.set_trace() + # if self.process_instance_model.bpmn_process_id is None: + subprocesses = process_instance_data_dict.pop("subprocesses") + bpmn_process_parent = self._add_bpmn_process(process_instance_data_dict) + for subprocess_task_id, subprocess_properties in subprocesses.items(): + # import pdb; pdb.set_trace() + print(f"subprocess_task_id: {subprocess_task_id}") + self._add_bpmn_process( + subprocess_properties, + bpmn_process_parent, + bpmn_process_guid=subprocess_task_id, + ) def save(self) -> None: """Saves the current state of this processor to the database.""" @@ -1693,6 +1700,7 @@ class ProcessInstanceProcessor: secondary_engine_step_delegate=step_delegate, serializer=self._serializer, process_instance=self.process_instance_model, + add_bpmn_process=self._add_bpmn_process, ) execution_strategy = execution_strategy_named( execution_strategy_name, task_model_delegate diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py index 32dbade8..700f29ca 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -1,8 +1,10 @@ import json from hashlib import sha256 +from typing import Tuple +from typing import Any from typing import Optional -from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore +from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer, BpmnWorkflow # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import TaskStateNames @@ -50,49 +52,60 @@ class TaskService: @classmethod def find_or_create_task_model_from_spiff_task( - cls, spiff_task: SpiffTask, process_instance: ProcessInstanceModel + cls, spiff_task: SpiffTask, process_instance: ProcessInstanceModel, + serializer: BpmnWorkflowSerializer, add_bpmn_process: Any ) -> TaskModel: spiff_task_guid = str(spiff_task.id) task_model: Optional[TaskModel] = TaskModel.query.filter_by( guid=spiff_task_guid ).first() if task_model is None: - bpmn_process = cls.task_bpmn_process(spiff_task, process_instance) - task_model = TaskModel( - guid=spiff_task_guid, bpmn_process_id=bpmn_process.id - ) - db.session.add(task_model) - db.session.commit() + bpmn_process = cls.task_bpmn_process(spiff_task, process_instance, serializer, add_bpmn_process) + task_model = TaskModel.query.filter_by( + guid=spiff_task_guid + ).first() + if task_model is None: + task_model = TaskModel( + guid=spiff_task_guid, bpmn_process_id=bpmn_process.id + ) + db.session.commit() return task_model @classmethod - def task_subprocess_guid(cls, spiff_task: SpiffTask) -> Optional[str]: + def task_subprocess(cls, spiff_task: SpiffTask) -> Optional[Tuple[str, BpmnWorkflow]]: top_level_workflow = spiff_task.workflow._get_outermost_workflow() my_wf = spiff_task.workflow # This is the workflow the spiff_task is part of + my_sp = None my_sp_id = None if my_wf != top_level_workflow: # All the subprocesses are at the top level, so you can just compare them for sp_id, sp in top_level_workflow.subprocesses.items(): if sp == my_wf: + my_sp = sp my_sp_id = sp_id break - return my_sp_id + return (str(my_sp_id), my_sp) @classmethod def task_bpmn_process( - cls, spiff_task: SpiffTask, process_instance: ProcessInstanceModel + cls, spiff_task: SpiffTask, process_instance: ProcessInstanceModel, + serializer: BpmnWorkflowSerializer, add_bpmn_process: Any ) -> BpmnProcessModel: - subprocess_guid = cls.task_subprocess_guid(spiff_task) - if subprocess_guid is None: + subprocess_guid, subprocess = cls.task_subprocess(spiff_task) + if subprocess is None: # This is the top level workflow, which has no guid return process_instance.bpmn_process else: + # import pdb; pdb.set_trace() bpmn_process: Optional[BpmnProcessModel] = BpmnProcessModel.query.filter_by( guid=subprocess_guid ).first() + # import pdb; pdb.set_trace() if bpmn_process is None: - spiff_task_guid = str(spiff_task.id) - raise Exception( - f"Could not find bpmn_process for task {spiff_task_guid}" - ) + bpmn_process = add_bpmn_process(serializer.workflow_to_dict(subprocess), process_instance.bpmn_process, subprocess_guid, add_tasks_if_new_bpmn_process=True) + db.session.commit() + # spiff_task_guid = str(spiff_task.id) + # raise Exception( + # f"Could not find bpmn_process for task {spiff_task_guid}" + # ) return bpmn_process diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index bdd8ebee..04cac19f 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -1,5 +1,6 @@ import logging import time +from typing import Any from typing import Callable from typing import List from typing import Optional @@ -50,12 +51,14 @@ class TaskModelSavingDelegate(EngineStepDelegate): serializer: BpmnWorkflowSerializer, process_instance: ProcessInstanceModel, secondary_engine_step_delegate: Optional[EngineStepDelegate] = None, + add_bpmn_process: Any = None, ) -> None: self.secondary_engine_step_delegate = secondary_engine_step_delegate self.process_instance = process_instance self.current_task_model: Optional[TaskModel] = None self.serializer = serializer + self.add_bpmn_process = add_bpmn_process def should_update_task_model(self) -> bool: """We need to figure out if we have previously save task info on this process intance. @@ -63,12 +66,13 @@ class TaskModelSavingDelegate(EngineStepDelegate): Use the bpmn_process_id to do this. """ return self.process_instance.bpmn_process_id is not None + # return False def will_complete_task(self, spiff_task: SpiffTask) -> None: if self.should_update_task_model(): self.current_task_model = ( TaskService.find_or_create_task_model_from_spiff_task( - spiff_task, self.process_instance + spiff_task, self.process_instance, self.serializer, self.add_bpmn_process ) ) self.current_task_model.start_in_seconds = time.time() diff --git a/spiffworkflow-backend/tests/data/manual_task_with_subprocesses/manual_task_with_subprocesses.bpmn b/spiffworkflow-backend/tests/data/manual_task_with_subprocesses/manual_task_with_subprocesses.bpmn new file mode 100644 index 00000000..d1b462f1 --- /dev/null +++ b/spiffworkflow-backend/tests/data/manual_task_with_subprocesses/manual_task_with_subprocesses.bpmn @@ -0,0 +1,112 @@ + + + + + Flow_0stlaxe + + + Flow_1and8ze + + + + ## Hello + + Flow_1fktmf7 + Flow_09gjylo + + + + Flow_0stlaxe + Flow_1fktmf7 + set_in_top_level_script = 1 + + + + + + + Flow_09gjylo + Flow_1i7syph + + Flow_00k1tii + + + + Flow_1b4o55k + + + + Flow_00k1tii + Flow_1b4o55k + set_in_top_level_subprocess = 1 + + + + Flow_1i7syph + Flow_1and8ze + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spiffworkflow-backend/tests/data/manual_task_with_subprocesses/test_process_to_call.bpmn b/spiffworkflow-backend/tests/data/manual_task_with_subprocesses/test_process_to_call.bpmn new file mode 100644 index 00000000..25b37c61 --- /dev/null +++ b/spiffworkflow-backend/tests/data/manual_task_with_subprocesses/test_process_to_call.bpmn @@ -0,0 +1,39 @@ + + + + + Flow_06g687y + + + + Flow_01e21r0 + + + + Flow_06g687y + Flow_01e21r0 + set_in_test_process_to_call_script = 1 + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index 7fa0f13b..63b4fa4f 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -314,9 +314,8 @@ class TestProcessInstanceProcessor(BaseTest): assert finance_group is not None process_model = load_test_spec( - process_model_id="test_group/manual_task", - bpmn_file_name="manual_task.bpmn", - process_model_source_directory="manual_task", + process_model_id="test_group/manual_task_with_subprocesses", + process_model_source_directory="manual_task_with_subprocesses", ) process_instance = self.create_process_instance_from_process_model( process_model=process_model, user=initiator_user