From 9e91ff8f912a60b8ddf5c82302afad10730df7fd Mon Sep 17 00:00:00 2001 From: jasquat Date: Mon, 27 Mar 2023 10:37:31 -0400 Subject: [PATCH 01/11] process children and tasks of parent subprocesses instead of looking for all tasks with a given state w/ burnettk --- .../services/process_instance_processor.py | 208 +++++++++--------- .../services/task_service.py | 4 +- .../services/workflow_execution_service.py | 28 ++- .../src/routes/ProcessInstanceShow.tsx | 33 +-- 4 files changed, 149 insertions(+), 124 deletions(-) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 722baa0d0..99f60a2cb 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1,5 +1,7 @@ """Process_instance_processor.""" import _strptime # type: ignore +from sqlalchemy import or_ +from sqlalchemy import and_ import decimal import json import logging @@ -1263,109 +1265,109 @@ class ProcessInstanceProcessor: cls, process_instance: ProcessInstanceModel, to_task_guid: str, commit: Optional[bool] = False ) -> None: """Reset a process to an earlier state.""" - raise Exception("This feature to reset a process instance to a given task is currently unavaiable") - # cls.add_event_to_process_instance( - # process_instance, ProcessInstanceEventType.process_instance_rewound_to_task.value, task_guid=to_task_guid - # ) - # - # to_task_model = TaskModel.query.filter_by(guid=to_task_guid, process_instance_id=process_instance.id).first() - # if to_task_model is None: - # raise TaskNotFoundError( - # f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'" - # ) - # - # parent_bpmn_processes, task_models_of_parent_bpmn_processes = TaskService.task_models_of_parent_bpmn_processes( - # to_task_model - # ) - # [p.guid for p in task_models_of_parent_bpmn_processes if p.guid] - # [p.id for p in parent_bpmn_processes] - # tasks_to_update_query = db.session.query(TaskModel).filter( - # and_( - # or_( - # TaskModel.end_in_seconds > to_task_model.end_in_seconds, - # TaskModel.end_in_seconds.is_(None), # type: ignore - # ), - # TaskModel.process_instance_id == process_instance.id, - # # TaskModel.bpmn_process_id.in_(parent_bpmn_processes_ids), # type: ignore - # ) - # ) - # tasks_to_update = tasks_to_update_query.all() - # - # # run all queries before making changes to task_model - # if commit: - # # tasks_to_delete_query = db.session.query(TaskModel).filter( - # # and_( - # # or_( - # # TaskModel.end_in_seconds > to_task_model.end_in_seconds, - # # TaskModel.end_in_seconds.is_not(None), # type: ignore - # # ), - # # TaskModel.process_instance_id == process_instance.id, - # # TaskModel.guid.not_in(task_models_of_parent_bpmn_processes_guids), # type: ignore - # # TaskModel.bpmn_process_id.not_in(parent_bpmn_processes_ids), # type: ignore - # # ) - # # ) - # # - # # tasks_to_delete = tasks_to_delete_query.all() - # # - # # # delete any later tasks from to_task_model and delete bpmn processes that may be - # # # link directly to one of those tasks. - # # tasks_to_delete_guids = [t.guid for t in tasks_to_delete] - # # tasks_to_delete_ids = [t.id for t in tasks_to_delete] - # # bpmn_processes_to_delete = BpmnProcessModel.query.filter( - # # BpmnProcessModel.guid.in_(tasks_to_delete_guids) # type: ignore - # # ).order_by(BpmnProcessModel.id.desc()).all() - # # human_tasks_to_delete = HumanTaskModel.query.filter( - # # HumanTaskModel.task_model_id.in_(tasks_to_delete_ids) # type: ignore - # # ).all() - # # - # # - # # import pdb; pdb.set_trace() - # # # ensure the correct order for foreign keys - # # for human_task_to_delete in human_tasks_to_delete: - # # db.session.delete(human_task_to_delete) - # # db.session.commit() - # # for task_to_delete in tasks_to_delete: - # # db.session.delete(task_to_delete) - # # db.session.commit() - # # for bpmn_process_to_delete in bpmn_processes_to_delete: - # # db.session.delete(bpmn_process_to_delete) - # # db.session.commit() - # - # related_human_task = HumanTaskModel.query.filter_by(task_model_id=to_task_model.id).first() - # if related_human_task is not None: - # db.session.delete(related_human_task) - # - # tasks_to_update_ids = [t.id for t in tasks_to_update] - # human_tasks_to_delete = HumanTaskModel.query.filter( - # HumanTaskModel.task_model_id.in_(tasks_to_update_ids) # type: ignore - # ).all() - # for human_task_to_delete in human_tasks_to_delete: - # db.session.delete(human_task_to_delete) - # db.session.commit() - # - # for task_to_update in tasks_to_update: - # TaskService.reset_task_model(task_to_update, state="FUTURE", commit=commit) - # - # parent_task_model = TaskModel.query.filter_by(guid=to_task_model.properties_json["parent"]).first() - # if parent_task_model is None: - # raise TaskNotFoundError( - # f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'" - # ) - # - # TaskService.reset_task_model( - # to_task_model, - # state="READY", - # json_data_hash=parent_task_model.json_data_hash, - # python_env_data_hash=parent_task_model.python_env_data_hash, - # commit=commit, - # ) - # for task_model in task_models_of_parent_bpmn_processes: - # TaskService.reset_task_model(task_model, state="WAITING", commit=commit) - # - # if commit: - # processor = ProcessInstanceProcessor(process_instance) - # processor.save() - # processor.suspend() + # raise Exception("This feature to reset a process instance to a given task is currently unavaiable") + cls.add_event_to_process_instance( + process_instance, ProcessInstanceEventType.process_instance_rewound_to_task.value, task_guid=to_task_guid + ) + + to_task_model = TaskModel.query.filter_by(guid=to_task_guid, process_instance_id=process_instance.id).first() + if to_task_model is None: + raise TaskNotFoundError( + f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'" + ) + + parent_bpmn_processes, task_models_of_parent_bpmn_processes = TaskService.task_models_of_parent_bpmn_processes( + to_task_model + ) + [p.guid for p in task_models_of_parent_bpmn_processes if p.guid] + [p.id for p in parent_bpmn_processes] + tasks_to_update_query = db.session.query(TaskModel).filter( + and_( + or_( + TaskModel.end_in_seconds > to_task_model.end_in_seconds, + TaskModel.end_in_seconds.is_(None), # type: ignore + ), + TaskModel.process_instance_id == process_instance.id, + # TaskModel.bpmn_process_id.in_(parent_bpmn_processes_ids), # type: ignore + ) + ) + tasks_to_update = tasks_to_update_query.all() + + # run all queries before making changes to task_model + if commit: + # tasks_to_delete_query = db.session.query(TaskModel).filter( + # and_( + # or_( + # TaskModel.end_in_seconds > to_task_model.end_in_seconds, + # TaskModel.end_in_seconds.is_not(None), # type: ignore + # ), + # TaskModel.process_instance_id == process_instance.id, + # TaskModel.guid.not_in(task_models_of_parent_bpmn_processes_guids), # type: ignore + # TaskModel.bpmn_process_id.not_in(parent_bpmn_processes_ids), # type: ignore + # ) + # ) + # + # tasks_to_delete = tasks_to_delete_query.all() + # + # # delete any later tasks from to_task_model and delete bpmn processes that may be + # # link directly to one of those tasks. + # tasks_to_delete_guids = [t.guid for t in tasks_to_delete] + # tasks_to_delete_ids = [t.id for t in tasks_to_delete] + # bpmn_processes_to_delete = BpmnProcessModel.query.filter( + # BpmnProcessModel.guid.in_(tasks_to_delete_guids) # type: ignore + # ).order_by(BpmnProcessModel.id.desc()).all() + # human_tasks_to_delete = HumanTaskModel.query.filter( + # HumanTaskModel.task_model_id.in_(tasks_to_delete_ids) # type: ignore + # ).all() + # + # + # import pdb; pdb.set_trace() + # # ensure the correct order for foreign keys + # for human_task_to_delete in human_tasks_to_delete: + # db.session.delete(human_task_to_delete) + # db.session.commit() + # for task_to_delete in tasks_to_delete: + # db.session.delete(task_to_delete) + # db.session.commit() + # for bpmn_process_to_delete in bpmn_processes_to_delete: + # db.session.delete(bpmn_process_to_delete) + # db.session.commit() + + related_human_task = HumanTaskModel.query.filter_by(task_model_id=to_task_model.id).first() + if related_human_task is not None: + db.session.delete(related_human_task) + + tasks_to_update_ids = [t.id for t in tasks_to_update] + human_tasks_to_delete = HumanTaskModel.query.filter( + HumanTaskModel.task_model_id.in_(tasks_to_update_ids) # type: ignore + ).all() + for human_task_to_delete in human_tasks_to_delete: + db.session.delete(human_task_to_delete) + db.session.commit() + + for task_to_update in tasks_to_update: + TaskService.reset_task_model(task_to_update, state="FUTURE", commit=commit) + + parent_task_model = TaskModel.query.filter_by(guid=to_task_model.properties_json["parent"]).first() + if parent_task_model is None: + raise TaskNotFoundError( + f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'" + ) + + TaskService.reset_task_model( + to_task_model, + state="READY", + json_data_hash=parent_task_model.json_data_hash, + python_env_data_hash=parent_task_model.python_env_data_hash, + commit=commit, + ) + for task_model in task_models_of_parent_bpmn_processes: + TaskService.reset_task_model(task_model, state="WAITING", commit=commit) + + if commit: + processor = ProcessInstanceProcessor(process_instance) + processor.save() + processor.suspend() @staticmethod def get_parser() -> MyCustomParser: diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py index d3cf545c5..4b86eefc3 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -110,9 +110,9 @@ class TaskService: for sp_id, sp in top_level_workflow.subprocesses.items(): if sp == my_wf: my_sp = sp - my_sp_id = sp_id + my_sp_id = str(sp_id) break - return (str(my_sp_id), my_sp) + return (my_sp_id, my_sp) @classmethod def task_bpmn_process( diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 4d9334183..99ef4ee6d 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -1,5 +1,6 @@ import logging import time +from uuid import UUID from typing import Callable from typing import Optional @@ -67,6 +68,7 @@ class TaskModelSavingDelegate(EngineStepDelegate): self.task_models: dict[str, TaskModel] = {} self.json_data_dicts: dict[str, JsonDataDict] = {} self.process_instance_events: dict[str, ProcessInstanceEventModel] = {} + self.last_completed_spiff_task: Optional[SpiffTask] = None def will_complete_task(self, spiff_task: SpiffTask) -> None: if self._should_update_task_model(): @@ -81,6 +83,7 @@ class TaskModelSavingDelegate(EngineStepDelegate): raise Exception("Could not find cached current_task_start_in_seconds. This should never have happend") task_model.start_in_seconds = self.current_task_start_in_seconds task_model.end_in_seconds = time.time() + self.last_completed_spiff_task= spiff_task if self.secondary_engine_step_delegate: self.secondary_engine_step_delegate.did_complete_task(spiff_task) @@ -104,10 +107,27 @@ class TaskModelSavingDelegate(EngineStepDelegate): # TODO: also include children of the last task processed. This may help with task resets # if we have to set their states to FUTURE. # excludes FUTURE and COMPLETED. the others were required to get PP1 to go to completion. - for waiting_spiff_task in bpmn_process_instance.get_tasks( - TaskState.WAITING | TaskState.CANCELLED | TaskState.READY | TaskState.MAYBE | TaskState.LIKELY - ): - self._update_task_model_with_spiff_task(waiting_spiff_task) + # for waiting_spiff_task in bpmn_process_instance.get_tasks( + # TaskState.WAITING | TaskState.CANCELLED | TaskState.READY | TaskState.MAYBE | TaskState.LIKELY + # ): + # self._update_task_model_with_spiff_task(waiting_spiff_task) + if self.last_completed_spiff_task is not None: + self._process_spiff_task_children(self.last_completed_spiff_task) + self._process_spiff_task_parents(self.last_completed_spiff_task) + + def _process_spiff_task_children(self, spiff_task: SpiffTask) -> None: + for child_spiff_task in spiff_task.children: + self._update_task_model_with_spiff_task(child_spiff_task) + self._process_spiff_task_children(child_spiff_task) + + def _process_spiff_task_parents(self, spiff_task: SpiffTask) -> None: + (parent_subprocess_guid, _parent_subprocess) = TaskService.task_subprocess(spiff_task) + if parent_subprocess_guid is not None: + spiff_task_of_parent_subprocess = spiff_task.workflow._get_outermost_workflow().get_task(UUID(parent_subprocess_guid)) + + if spiff_task_of_parent_subprocess is not None: + self._update_task_model_with_spiff_task(spiff_task_of_parent_subprocess) + self._process_spiff_task_parents(spiff_task_of_parent_subprocess) def _should_update_task_model(self) -> bool: """We need to figure out if we have previously save task info on this process intance. diff --git a/spiffworkflow-frontend/src/routes/ProcessInstanceShow.tsx b/spiffworkflow-frontend/src/routes/ProcessInstanceShow.tsx index feaa41737..eaf909556 100644 --- a/spiffworkflow-frontend/src/routes/ProcessInstanceShow.tsx +++ b/spiffworkflow-frontend/src/routes/ProcessInstanceShow.tsx @@ -260,7 +260,7 @@ export default function ProcessInstanceShow({ variant }: OwnProps) { return !taskToTimeTravelTo; }; - const completionViewLink = (label: any, taskGuid: string) => { + const queryParams = () => { const processIdentifier = searchParams.get('process_identifier'); const callActivityTaskId = searchParams.get('bpmn_process_guid'); const queryParamArray = []; @@ -270,16 +270,19 @@ export default function ProcessInstanceShow({ variant }: OwnProps) { if (callActivityTaskId) { queryParamArray.push(`bpmn_process_guid=${callActivityTaskId}`); } - let queryParams = ''; + let queryParamString = ''; if (queryParamArray.length > 0) { - queryParams = `?${queryParamArray.join('&')}`; + queryParamString = `?${queryParamArray.join('&')}`; } + return queryParamString; + }; + const completionViewLink = (label: any, taskGuid: string) => { return ( {label} @@ -287,7 +290,7 @@ export default function ProcessInstanceShow({ variant }: OwnProps) { }; const returnToProcessInstance = () => { - window.location.href = processInstanceShowPageBaseUrl; + window.location.href = `${processInstanceShowPageBaseUrl}${queryParams()}`; }; const resetProcessInstance = () => { @@ -671,16 +674,16 @@ export default function ProcessInstanceShow({ variant }: OwnProps) { ); }; - const canResetProcess = (_task: Task) => { - // disabling this feature for now - return false; - // return ( - // ability.can('POST', targetUris.processInstanceResetPath) && - // processInstance && - // processInstance.status === 'suspended' && - // task.state === 'READY' && - // !showingActiveTask() - // ); + const canResetProcess = (task: Task) => { + // // disabling this feature for now + // return false; + return ( + ability.can('POST', targetUris.processInstanceResetPath) && + processInstance && + processInstance.status === 'suspended' && + task.state === 'READY' && + !showingActiveTask() + ); }; const getEvents = (task: Task) => { From 54b1a43494c1ebdcf9202a527034838cbebd4fc1 Mon Sep 17 00:00:00 2001 From: burnettk Date: Tue, 28 Mar 2023 08:04:42 -0400 Subject: [PATCH 02/11] lint --- .../services/process_instance_processor.py | 4 ++-- .../services/workflow_execution_service.py | 8 +++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 54ee33b2e..76b3d9ac6 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1,7 +1,5 @@ """Process_instance_processor.""" import _strptime # type: ignore -from sqlalchemy import or_ -from sqlalchemy import and_ import decimal import json import logging @@ -53,6 +51,8 @@ from SpiffWorkflow.spiff.serializer.config import SPIFF_SPEC_CONFIG # type: ign from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import TaskState from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore +from sqlalchemy import and_ +from sqlalchemy import or_ from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 99ef4ee6d..ac0f1a41d 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -1,8 +1,8 @@ import logging import time -from uuid import UUID from typing import Callable from typing import Optional +from uuid import UUID from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore @@ -83,7 +83,7 @@ class TaskModelSavingDelegate(EngineStepDelegate): raise Exception("Could not find cached current_task_start_in_seconds. This should never have happend") task_model.start_in_seconds = self.current_task_start_in_seconds task_model.end_in_seconds = time.time() - self.last_completed_spiff_task= spiff_task + self.last_completed_spiff_task = spiff_task if self.secondary_engine_step_delegate: self.secondary_engine_step_delegate.did_complete_task(spiff_task) @@ -123,7 +123,9 @@ class TaskModelSavingDelegate(EngineStepDelegate): def _process_spiff_task_parents(self, spiff_task: SpiffTask) -> None: (parent_subprocess_guid, _parent_subprocess) = TaskService.task_subprocess(spiff_task) if parent_subprocess_guid is not None: - spiff_task_of_parent_subprocess = spiff_task.workflow._get_outermost_workflow().get_task(UUID(parent_subprocess_guid)) + spiff_task_of_parent_subprocess = spiff_task.workflow._get_outermost_workflow().get_task( + UUID(parent_subprocess_guid) + ) if spiff_task_of_parent_subprocess is not None: self._update_task_model_with_spiff_task(spiff_task_of_parent_subprocess) From d2f1ca1492fc3d3861a4dae16b6aaee2d0ca7062 Mon Sep 17 00:00:00 2001 From: jasquat Date: Tue, 28 Mar 2023 08:23:09 -0400 Subject: [PATCH 03/11] some more attempts to get reset working --- .../services/process_instance_processor.py | 13 ++ .../services/task_service.py | 3 +- .../unit/test_process_instance_processor.py | 162 ++++++++++++------ 3 files changed, 123 insertions(+), 55 deletions(-) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 99f60a2cb..c08a45313 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1,4 +1,5 @@ """Process_instance_processor.""" +import copy import _strptime # type: ignore from sqlalchemy import or_ from sqlalchemy import and_ @@ -1346,7 +1347,12 @@ class ProcessInstanceProcessor: db.session.commit() for task_to_update in tasks_to_update: + # print(f"task_to_update: {task_to_update}") TaskService.reset_task_model(task_to_update, state="FUTURE", commit=commit) + # if task_to_update.task_definition.bpmn_identifier != 'top_level_process_script_after_gate': + # TaskService.reset_task_model(task_to_update, state='FUTURE', commit=commit) + # else: + # TaskService.reset_task_model(task_to_update, state=task_to_update.state, commit=commit) parent_task_model = TaskModel.query.filter_by(guid=to_task_model.properties_json["parent"]).first() if parent_task_model is None: @@ -1364,6 +1370,13 @@ class ProcessInstanceProcessor: for task_model in task_models_of_parent_bpmn_processes: TaskService.reset_task_model(task_model, state="WAITING", commit=commit) + bpmn_process = to_task_model.bpmn_process + properties_json = copy.copy(bpmn_process.properties_json) + properties_json['last_task'] = parent_task_model.guid + bpmn_process.properties_json = properties_json + db.session.add(bpmn_process) + db.session.commit() + if commit: processor = ProcessInstanceProcessor(process_instance) processor.save() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py index 4b86eefc3..c9299925c 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -1,3 +1,4 @@ +import copy import json from hashlib import sha256 from typing import Optional @@ -360,7 +361,7 @@ class TaskService: else: task_model.python_env_data_hash = python_env_data_hash - new_properties_json = task_model.properties_json + new_properties_json = copy.copy(task_model.properties_json) task_model.state = state task_model.start_in_seconds = None task_model.end_in_seconds = None diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index 377091971..7ef7b40f7 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -256,60 +256,114 @@ class TestProcessInstanceProcessor(BaseTest): assert spiff_task is not None assert spiff_task.state == TaskState.COMPLETED - # TODO: FIX resetting a process instance to a task - # def test_properly_resets_process_to_given_task( - # self, - # app: Flask, - # client: FlaskClient, - # with_db_and_bpmn_file_cleanup: None, - # with_super_admin_user: UserModel, - # ) -> None: - # self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group") - # initiator_user = self.find_or_create_user("initiator_user") - # finance_user_three = self.find_or_create_user("testuser3") - # assert initiator_user.principal is not None - # assert finance_user_three.principal is not None - # AuthorizationService.import_permissions_from_yaml_file() - # - # finance_group = GroupModel.query.filter_by(identifier="Finance Team").first() - # assert finance_group is not None - # - # process_model = load_test_spec( - # process_model_id="test_group/manual_task_with_subprocesses", - # process_model_source_directory="manual_task_with_subprocesses", - # ) - # process_instance = self.create_process_instance_from_process_model( - # process_model=process_model, user=initiator_user - # ) - # processor = ProcessInstanceProcessor(process_instance) - # processor.do_engine_steps(save=True) - # assert len(process_instance.active_human_tasks) == 1 - # initial_human_task_id = process_instance.active_human_tasks[0].id - # - # # save again to ensure we go attempt to process the human tasks again - # processor.save() - # - # assert len(process_instance.active_human_tasks) == 1 - # assert initial_human_task_id == process_instance.active_human_tasks[0].id - # - # processor = ProcessInstanceProcessor(process_instance) - # human_task_one = process_instance.active_human_tasks[0] - # spiff_manual_task = processor.__class__.get_task_by_bpmn_identifier( - # human_task_one.task_name, processor.bpmn_process_instance - # ) - # ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) - # - # processor.suspend() - # ProcessInstanceProcessor.reset_process(process_instance, str(spiff_manual_task.id), commit=True) - # - # process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() - # processor = ProcessInstanceProcessor(process_instance) - # human_task_one = process_instance.active_human_tasks[0] - # spiff_manual_task = processor.bpmn_process_instance.get_task(UUID(human_task_one.task_id)) - # ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) - # human_task_one = process_instance.active_human_tasks[0] - # spiff_manual_task = processor.bpmn_process_instance.get_task(UUID(human_task_one.task_id)) - # ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) + def test_properly_resets_process_to_given_task( + self, + app: Flask, + client: FlaskClient, + with_db_and_bpmn_file_cleanup: None, + with_super_admin_user: UserModel, + ) -> None: + self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group") + initiator_user = self.find_or_create_user("initiator_user") + finance_user_three = self.find_or_create_user("testuser3") + assert initiator_user.principal is not None + assert finance_user_three.principal is not None + AuthorizationService.import_permissions_from_yaml_file() + + finance_group = GroupModel.query.filter_by(identifier="Finance Team").first() + assert finance_group is not None + + process_model = load_test_spec( + process_model_id="test_group/manual_task", + process_model_source_directory="manual_task", + ) + process_instance = self.create_process_instance_from_process_model( + process_model=process_model, user=initiator_user + ) + processor = ProcessInstanceProcessor(process_instance) + processor.do_engine_steps(save=True) + assert len(process_instance.active_human_tasks) == 1 + initial_human_task_id = process_instance.active_human_tasks[0].id + + # save again to ensure we go attempt to process the human tasks again + processor.save() + + assert len(process_instance.active_human_tasks) == 1 + assert initial_human_task_id == process_instance.active_human_tasks[0].id + + processor = ProcessInstanceProcessor(process_instance) + human_task_one = process_instance.active_human_tasks[0] + spiff_manual_task = processor.__class__.get_task_by_bpmn_identifier( + human_task_one.task_name, processor.bpmn_process_instance + ) + + processor.suspend() + ProcessInstanceProcessor.reset_process(process_instance, str(spiff_manual_task.parent.id), commit=True) + + process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() + processor = ProcessInstanceProcessor(process_instance) + processor.resume() + processor.do_engine_steps(save=True) + human_task_one = process_instance.active_human_tasks[0] + spiff_manual_task = processor.bpmn_process_instance.get_task(UUID(human_task_one.task_id)) + ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) + assert process_instance.status == "complete" + + def test_properly_resets_process_to_given_task_with_call_activity( + self, + app: Flask, + client: FlaskClient, + with_db_and_bpmn_file_cleanup: None, + with_super_admin_user: UserModel, + ) -> None: + self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group") + initiator_user = self.find_or_create_user("initiator_user") + finance_user_three = self.find_or_create_user("testuser3") + assert initiator_user.principal is not None + assert finance_user_three.principal is not None + AuthorizationService.import_permissions_from_yaml_file() + + finance_group = GroupModel.query.filter_by(identifier="Finance Team").first() + assert finance_group is not None + + process_model = load_test_spec( + process_model_id="test_group/manual_task_with_subprocesses", + process_model_source_directory="manual_task_with_subprocesses", + ) + process_instance = self.create_process_instance_from_process_model( + process_model=process_model, user=initiator_user + ) + processor = ProcessInstanceProcessor(process_instance) + processor.do_engine_steps(save=True) + import pdb; pdb.set_trace() + assert len(process_instance.active_human_tasks) == 1 + initial_human_task_id = process_instance.active_human_tasks[0].id + + # save again to ensure we go attempt to process the human tasks again + processor.save() + + assert len(process_instance.active_human_tasks) == 1 + assert initial_human_task_id == process_instance.active_human_tasks[0].id + + processor = ProcessInstanceProcessor(process_instance) + human_task_one = process_instance.active_human_tasks[0] + spiff_manual_task = processor.__class__.get_task_by_bpmn_identifier( + human_task_one.task_name, processor.bpmn_process_instance + ) + + processor.suspend() + ProcessInstanceProcessor.reset_process(process_instance, str(spiff_manual_task.parent.id), commit=True) + import pdb; pdb.set_trace() + + process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() + processor = ProcessInstanceProcessor(process_instance) + processor.resume() + processor.do_engine_steps(save=True) + import pdb; pdb.set_trace() + human_task_one = process_instance.active_human_tasks[0] + spiff_manual_task = processor.bpmn_process_instance.get_task(UUID(human_task_one.task_id)) + ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) + assert process_instance.status == "complete" def test_properly_saves_tasks_when_running( self, From 8bd7716d4c5067ff5e0e6e6ff02981e81dcc4c7b Mon Sep 17 00:00:00 2001 From: jasquat Date: Tue, 28 Mar 2023 15:07:31 -0400 Subject: [PATCH 04/11] some more debugging --- .../services/process_instance_processor.py | 1 + .../services/task_service.py | 21 ++--- .../services/workflow_execution_service.py | 19 ++++- .../manual_task_with_subprocesses.bpmn | 83 +++++++++++-------- .../unit/test_process_instance_processor.py | 25 +++--- 5 files changed, 82 insertions(+), 67 deletions(-) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 0ec6c1064..7d33fc4e8 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1349,6 +1349,7 @@ class ProcessInstanceProcessor: for task_to_update in tasks_to_update: # print(f"task_to_update: {task_to_update}") TaskService.reset_task_model(task_to_update, state="FUTURE", commit=commit) + # TaskService.reset_task_model(task_to_update, state=task_to_update.state, commit=commit) # if task_to_update.task_definition.bpmn_identifier != 'top_level_process_script_after_gate': # TaskService.reset_task_model(task_to_update, state='FUTURE', commit=commit) # else: diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py index be6407b58..a67a7755d 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -59,6 +59,8 @@ class TaskService: It also returns the relating json_data object so they can be imported later. """ new_properties_json = serializer.task_to_dict(spiff_task) + if new_properties_json["task_spec"] == "Start": + new_properties_json["parent"] = None spiff_task_data = new_properties_json.pop("data") python_env_data_dict = cls._get_python_env_data_dict_from_spiff_task(spiff_task, serializer) task_model.properties_json = new_properties_json @@ -251,11 +253,7 @@ class TaskService: # bpmn process defintion so let's avoid using it. if task_properties["task_spec"] == "Root": continue - if task_properties["task_spec"] == "Start": - task_properties["parent"] = None - task_data_dict = task_properties.pop("data") - state_int = task_properties["state"] spiff_task = spiff_workflow.get_task_from_id(UUID(task_id)) task_model = TaskModel.query.filter_by(guid=task_id).first() @@ -266,23 +264,14 @@ class TaskService: spiff_task, bpmn_definition_to_task_definitions_mappings, ) - task_model.state = TaskStateNames[state_int] - task_model.properties_json = task_properties - new_task_models[task_model.guid] = task_model - json_data_dict = TaskService.update_task_data_on_task_model( - task_model, task_data_dict, "json_data_hash" - ) + json_data_dict, python_env_dict = cls.update_task_model(task_model, spiff_task, serializer) + + new_task_models[task_model.guid] = task_model if json_data_dict is not None: new_json_data_dicts[json_data_dict["hash"]] = json_data_dict - - python_env_data_dict = cls._get_python_env_data_dict_from_spiff_task(spiff_task, serializer) - python_env_dict = TaskService.update_task_data_on_task_model( - task_model, python_env_data_dict, "python_env_data_hash" - ) if python_env_dict is not None: new_json_data_dicts[python_env_dict["hash"]] = python_env_dict - return (bpmn_process, new_task_models, new_json_data_dicts) @classmethod diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index ac0f1a41d..cc6a3a02f 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -11,6 +11,8 @@ from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import TaskState from spiffworkflow_backend.exceptions.api_error import ApiError +from spiffworkflow_backend.models import task_definition +from spiffworkflow_backend.models.bpmn_process_definition import BpmnProcessDefinitionModel from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.message_instance import MessageInstanceModel from spiffworkflow_backend.models.message_instance_correlation import ( @@ -19,7 +21,8 @@ from spiffworkflow_backend.models.message_instance_correlation import ( from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventModel from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType -from spiffworkflow_backend.models.task import TaskModel # noqa: F401 +from spiffworkflow_backend.models.task import TaskModel +from spiffworkflow_backend.models.task_definition import TaskDefinitionModel # noqa: F401 from spiffworkflow_backend.services.assertion_service import safe_assertion from spiffworkflow_backend.services.process_instance_lock_service import ( ProcessInstanceLockService, @@ -93,6 +96,7 @@ class TaskModelSavingDelegate(EngineStepDelegate): failing_spiff_task = script_engine.failing_spiff_task self._update_task_model_with_spiff_task(failing_spiff_task, task_failed=True) + # import pdb; pdb.set_trace() db.session.bulk_save_objects(self.task_models.values()) db.session.bulk_save_objects(self.process_instance_events.values()) @@ -123,7 +127,7 @@ class TaskModelSavingDelegate(EngineStepDelegate): def _process_spiff_task_parents(self, spiff_task: SpiffTask) -> None: (parent_subprocess_guid, _parent_subprocess) = TaskService.task_subprocess(spiff_task) if parent_subprocess_guid is not None: - spiff_task_of_parent_subprocess = spiff_task.workflow._get_outermost_workflow().get_task( + spiff_task_of_parent_subprocess = spiff_task.workflow._get_outermost_workflow().get_task_from_id( UUID(parent_subprocess_guid) ) @@ -156,6 +160,17 @@ class TaskModelSavingDelegate(EngineStepDelegate): bpmn_process_json_data = TaskService.update_task_data_on_bpmn_process( bpmn_process or task_model.bpmn_process, spiff_task.workflow.data ) + # stp = False + # for ntm in new_task_models.values(): + # td = TaskDefinitionModel.query.filter_by(id=ntm.task_definition_id).first() + # if td.bpmn_identifier == 'Start': + # # import pdb; pdb.set_trace() + # stp = True + # print("HEY") + + # if stp: + # # import pdb; pdb.set_trace() + # print("HEY2") self.task_models.update(new_task_models) self.json_data_dicts.update(new_json_data_dicts) json_data_dict_list = TaskService.update_task_model(task_model, spiff_task, self.serializer) diff --git a/spiffworkflow-backend/tests/data/manual_task_with_subprocesses/manual_task_with_subprocesses.bpmn b/spiffworkflow-backend/tests/data/manual_task_with_subprocesses/manual_task_with_subprocesses.bpmn index f49f99cd9..d2b1d94e5 100644 --- a/spiffworkflow-backend/tests/data/manual_task_with_subprocesses/manual_task_with_subprocesses.bpmn +++ b/spiffworkflow-backend/tests/data/manual_task_with_subprocesses/manual_task_with_subprocesses.bpmn @@ -7,12 +7,12 @@ Flow_1ygcsbt - + ## Hello - Flow_1fktmf7 Flow_1t9ywmr + Flow_0q30935 Flow_09gjylo @@ -21,8 +21,8 @@ Flow_1fktmf7 set_in_top_level_script = 1 - - + + Flow_09gjylo Flow_0yxus36 @@ -65,34 +65,43 @@ except: Flow_1ygcsbt set_top_level_process_script_after_gate = 1 - + + + + Flow_1fktmf7 + Flow_0q30935 + - - - - - - - - - - - - - - + + - + + + + + + + + + + + + + + + + + @@ -100,33 +109,37 @@ except: - + - - + + - - + + - - + + - - + + - - - - + + + + - - + + + + + + diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index b85bbec32..0675394b5 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -1,5 +1,6 @@ """Test_process_instance_processor.""" from uuid import UUID +import json import pytest from flask import g @@ -305,7 +306,7 @@ class TestProcessInstanceProcessor(BaseTest): processor.resume() processor.do_engine_steps(save=True) human_task_one = process_instance.active_human_tasks[0] - spiff_manual_task = processor.bpmn_process_instance.get_task(UUID(human_task_one.task_id)) + spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) assert process_instance.status == "complete" @@ -335,34 +336,30 @@ class TestProcessInstanceProcessor(BaseTest): ) processor = ProcessInstanceProcessor(process_instance) processor.do_engine_steps(save=True) - import pdb; pdb.set_trace() + with open("before_reset.json", 'w') as f: f.write(json.dumps(processor.serialize(), indent=2)) assert len(process_instance.active_human_tasks) == 1 initial_human_task_id = process_instance.active_human_tasks[0].id - - # save again to ensure we go attempt to process the human tasks again - processor.save() - assert len(process_instance.active_human_tasks) == 1 assert initial_human_task_id == process_instance.active_human_tasks[0].id - processor = ProcessInstanceProcessor(process_instance) human_task_one = process_instance.active_human_tasks[0] - spiff_manual_task = processor.__class__.get_task_by_bpmn_identifier( - human_task_one.task_name, processor.bpmn_process_instance - ) + spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) + ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) + processor.suspend() - ProcessInstanceProcessor.reset_process(process_instance, str(spiff_manual_task.parent.id), commit=True) - import pdb; pdb.set_trace() + ProcessInstanceProcessor.reset_process(process_instance, str(spiff_manual_task.id), commit=True) process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() processor = ProcessInstanceProcessor(process_instance) + with open("after_reset.json", 'w') as f: f.write(json.dumps(processor.serialize(), indent=2)) processor.resume() processor.do_engine_steps(save=True) - import pdb; pdb.set_trace() human_task_one = process_instance.active_human_tasks[0] - spiff_manual_task = processor.bpmn_process_instance.get_task(UUID(human_task_one.task_id)) + spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) + + import pdb; pdb.set_trace() assert process_instance.status == "complete" def test_properly_saves_tasks_when_running( From 7d7e976b37454e9d240c5b2cd08579e448218ec5 Mon Sep 17 00:00:00 2001 From: jasquat Date: Thu, 30 Mar 2023 11:15:27 -0400 Subject: [PATCH 05/11] added an init method to task service and move a lot of code from workflow execution to it and fixed up the task running test to check things more thoroughly --- .../services/process_instance_processor.py | 11 +- .../services/task_service.py | 140 ++++++++++++++++++ .../services/workflow_execution_service.py | 105 ++----------- .../manual_task_with_subprocesses.bpmn | 42 +++--- .../unit/test_process_instance_processor.py | 111 ++++++++++---- 5 files changed, 268 insertions(+), 141 deletions(-) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 518506bcb..1c32efb80 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1,6 +1,6 @@ """Process_instance_processor.""" -import copy import _strptime # type: ignore +import copy import decimal import json import logging @@ -1373,7 +1373,7 @@ class ProcessInstanceProcessor: bpmn_process = to_task_model.bpmn_process properties_json = copy.copy(bpmn_process.properties_json) - properties_json['last_task'] = parent_task_model.guid + properties_json["last_task"] = parent_task_model.guid bpmn_process.properties_json = properties_json db.session.add(bpmn_process) db.session.commit() @@ -1818,6 +1818,13 @@ class ProcessInstanceProcessor: user_id=user.id, ) + task_service = TaskService( + process_instance=self.process_instance_model, + serializer=self._serializer, + bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, + ) + task_service.process_parents_and_children_and_save_to_database(spiff_task) + # this is the thing that actually commits the db transaction (on behalf of the other updates above as well) self.save() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py index a67a7755d..671d415ca 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -1,5 +1,6 @@ import copy import json +import time from hashlib import sha256 from typing import Optional from typing import Tuple @@ -20,6 +21,8 @@ from spiffworkflow_backend.models.bpmn_process import BpmnProcessNotFoundError from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.json_data import JsonDataModel # noqa: F401 from spiffworkflow_backend.models.process_instance import ProcessInstanceModel +from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventModel +from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType from spiffworkflow_backend.models.task import TaskModel # noqa: F401 @@ -31,6 +34,135 @@ class JsonDataDict(TypedDict): class TaskService: PYTHON_ENVIRONMENT_STATE_KEY = "spiff__python_env_state" + def __init__( + self, + process_instance: ProcessInstanceModel, + serializer: BpmnWorkflowSerializer, + bpmn_definition_to_task_definitions_mappings: dict, + ) -> None: + self.process_instance = process_instance + self.bpmn_definition_to_task_definitions_mappings = bpmn_definition_to_task_definitions_mappings + self.serializer = serializer + + self.bpmn_processes: dict[str, BpmnProcessModel] = {} + self.task_models: dict[str, TaskModel] = {} + self.json_data_dicts: dict[str, JsonDataDict] = {} + self.process_instance_events: dict[str, ProcessInstanceEventModel] = {} + + def save_objects_to_database(self) -> None: + db.session.bulk_save_objects(self.bpmn_processes.values()) + db.session.bulk_save_objects(self.task_models.values()) + db.session.bulk_save_objects(self.process_instance_events.values()) + self.__class__.insert_or_update_json_data_records(self.json_data_dicts) + + def process_parents_and_children_and_save_to_database( + self, + spiff_task: SpiffTask, + ) -> None: + self.process_spiff_task_children(spiff_task) + self.process_spiff_task_parents(spiff_task) + self.save_objects_to_database() + + def process_spiff_task_children( + self, + spiff_task: SpiffTask, + ) -> None: + for child_spiff_task in spiff_task.children: + self.update_task_model_with_spiff_task( + spiff_task=child_spiff_task, + ) + self.process_spiff_task_children( + spiff_task=child_spiff_task, + ) + + def process_spiff_task_parents( + self, + spiff_task: SpiffTask, + ) -> None: + (parent_subprocess_guid, _parent_subprocess) = self.__class__.task_subprocess(spiff_task) + if parent_subprocess_guid is not None: + spiff_task_of_parent_subprocess = spiff_task.workflow._get_outermost_workflow().get_task_from_id( + UUID(parent_subprocess_guid) + ) + + if spiff_task_of_parent_subprocess is not None: + self.update_task_model_with_spiff_task( + spiff_task=spiff_task_of_parent_subprocess, + ) + self.process_spiff_task_parents( + spiff_task=spiff_task_of_parent_subprocess, + ) + + def update_task_model_with_spiff_task( + self, + spiff_task: SpiffTask, + task_failed: bool = False, + ) -> TaskModel: + ( + new_bpmn_process, + task_model, + new_task_models, + new_json_data_dicts, + ) = self.__class__.find_or_create_task_model_from_spiff_task( + spiff_task, + self.process_instance, + self.serializer, + bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, + ) + bpmn_process = new_bpmn_process or task_model.bpmn_process + bpmn_process_json_data = self.__class__.update_task_data_on_bpmn_process( + bpmn_process, spiff_task.workflow.data + ) + self.task_models.update(new_task_models) + self.json_data_dicts.update(new_json_data_dicts) + json_data_dict_list = self.__class__.update_task_model(task_model, spiff_task, self.serializer) + self.task_models[task_model.guid] = task_model + if bpmn_process_json_data is not None: + json_data_dict_list.append(bpmn_process_json_data) + self._update_json_data_dicts_using_list(json_data_dict_list, self.json_data_dicts) + + if task_model.state == "COMPLETED" or task_failed: + event_type = ProcessInstanceEventType.task_completed.value + if task_failed: + event_type = ProcessInstanceEventType.task_failed.value + + # FIXME: some failed tasks will currently not have either timestamp since we only hook into spiff when tasks complete + # which script tasks execute when READY. + timestamp = task_model.end_in_seconds or task_model.start_in_seconds or time.time() + process_instance_event = ProcessInstanceEventModel( + task_guid=task_model.guid, + process_instance_id=self.process_instance.id, + event_type=event_type, + timestamp=timestamp, + ) + self.process_instance_events[task_model.guid] = process_instance_event + + # self.update_bpmn_process(spiff_task.workflow, bpmn_process) + return task_model + + def update_bpmn_process( + self, + spiff_workflow: BpmnWorkflow, + bpmn_process: BpmnProcessModel, + ) -> None: + # import pdb; pdb.set_trace() + new_properties_json = copy.copy(bpmn_process.properties_json) + new_properties_json["last_task"] = str(spiff_workflow.last_task) if spiff_workflow.last_task else None + new_properties_json["success"] = spiff_workflow.success + bpmn_process.properties_json = new_properties_json + + bpmn_process_json_data = self.__class__.update_task_data_on_bpmn_process(bpmn_process, spiff_workflow.data) + if bpmn_process_json_data is not None: + self.json_data_dicts[bpmn_process_json_data["hash"]] = bpmn_process_json_data + + self.bpmn_processes[bpmn_process.guid or "top_level"] = bpmn_process + + if spiff_workflow.outer_workflow != spiff_workflow: + direct_parent_bpmn_process = BpmnProcessModel.query.filter_by( + id=bpmn_process.direct_parent_process_id + ).first() + self.update_bpmn_process(spiff_workflow.outer_workflow, direct_parent_bpmn_process) + @classmethod def insert_or_update_json_data_records( cls, json_data_hash_to_json_data_dict_mapping: dict[str, JsonDataDict] @@ -395,3 +527,11 @@ class TaskService: # this helps to convert items like datetime objects to be json serializable converted_data: dict = serializer.data_converter.convert(user_defined_state) return converted_data + + @classmethod + def _update_json_data_dicts_using_list( + cls, json_data_dict_list: list[Optional[JsonDataDict]], json_data_dicts: dict[str, JsonDataDict] + ) -> None: + for json_data_dict in json_data_dict_list: + if json_data_dict is not None: + json_data_dicts[json_data_dict["hash"]] = json_data_dict diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 88634456a..64e197ef2 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -1,7 +1,6 @@ import time from typing import Callable from typing import Optional -from uuid import UUID from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore @@ -10,23 +9,18 @@ from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import TaskState from spiffworkflow_backend.exceptions.api_error import ApiError -from spiffworkflow_backend.models import task_definition -from spiffworkflow_backend.models.bpmn_process_definition import BpmnProcessDefinitionModel from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.message_instance import MessageInstanceModel from spiffworkflow_backend.models.message_instance_correlation import ( MessageInstanceCorrelationRuleModel, ) from spiffworkflow_backend.models.process_instance import ProcessInstanceModel -from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventModel -from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType from spiffworkflow_backend.models.task import TaskModel from spiffworkflow_backend.models.task_definition import TaskDefinitionModel # noqa: F401 from spiffworkflow_backend.services.assertion_service import safe_assertion from spiffworkflow_backend.services.process_instance_lock_service import ( ProcessInstanceLockService, ) -from spiffworkflow_backend.services.task_service import JsonDataDict from spiffworkflow_backend.services.task_service import TaskService @@ -67,11 +61,17 @@ class TaskModelSavingDelegate(EngineStepDelegate): self.current_task_model: Optional[TaskModel] = None self.current_task_start_in_seconds: Optional[float] = None - self.task_models: dict[str, TaskModel] = {} - self.json_data_dicts: dict[str, JsonDataDict] = {} - self.process_instance_events: dict[str, ProcessInstanceEventModel] = {} + # self.task_models: dict[str, TaskModel] = {} + # self.json_data_dicts: dict[str, JsonDataDict] = {} + # self.process_instance_events: dict[str, ProcessInstanceEventModel] = {} self.last_completed_spiff_task: Optional[SpiffTask] = None + self.task_service = TaskService( + process_instance=self.process_instance, + serializer=self.serializer, + bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, + ) + def will_complete_task(self, spiff_task: SpiffTask) -> None: if self._should_update_task_model(): self.current_task_start_in_seconds = time.time() @@ -80,7 +80,7 @@ class TaskModelSavingDelegate(EngineStepDelegate): def did_complete_task(self, spiff_task: SpiffTask) -> None: if self._should_update_task_model(): - task_model = self._update_task_model_with_spiff_task(spiff_task) + task_model = self.task_service.update_task_model_with_spiff_task(spiff_task) if self.current_task_start_in_seconds is None: raise Exception("Could not find cached current_task_start_in_seconds. This should never have happend") task_model.start_in_seconds = self.current_task_start_in_seconds @@ -93,13 +93,9 @@ class TaskModelSavingDelegate(EngineStepDelegate): script_engine = bpmn_process_instance.script_engine if hasattr(script_engine, "failing_spiff_task") and script_engine.failing_spiff_task is not None: failing_spiff_task = script_engine.failing_spiff_task - self._update_task_model_with_spiff_task(failing_spiff_task, task_failed=True) + self.task_service.update_task_model_with_spiff_task(failing_spiff_task, task_failed=True) - # import pdb; pdb.set_trace() - db.session.bulk_save_objects(self.task_models.values()) - db.session.bulk_save_objects(self.process_instance_events.values()) - - TaskService.insert_or_update_json_data_records(self.json_data_dicts) + self.task_service.save_objects_to_database() if self.secondary_engine_step_delegate: self.secondary_engine_step_delegate.save(bpmn_process_instance, commit=False) @@ -115,24 +111,8 @@ class TaskModelSavingDelegate(EngineStepDelegate): # ): # self._update_task_model_with_spiff_task(waiting_spiff_task) if self.last_completed_spiff_task is not None: - self._process_spiff_task_children(self.last_completed_spiff_task) - self._process_spiff_task_parents(self.last_completed_spiff_task) - - def _process_spiff_task_children(self, spiff_task: SpiffTask) -> None: - for child_spiff_task in spiff_task.children: - self._update_task_model_with_spiff_task(child_spiff_task) - self._process_spiff_task_children(child_spiff_task) - - def _process_spiff_task_parents(self, spiff_task: SpiffTask) -> None: - (parent_subprocess_guid, _parent_subprocess) = TaskService.task_subprocess(spiff_task) - if parent_subprocess_guid is not None: - spiff_task_of_parent_subprocess = spiff_task.workflow._get_outermost_workflow().get_task_from_id( - UUID(parent_subprocess_guid) - ) - - if spiff_task_of_parent_subprocess is not None: - self._update_task_model_with_spiff_task(spiff_task_of_parent_subprocess) - self._process_spiff_task_parents(spiff_task_of_parent_subprocess) + self.task_service.process_spiff_task_parents(self.last_completed_spiff_task) + self.task_service.process_spiff_task_children(self.last_completed_spiff_task) def _should_update_task_model(self) -> bool: """We need to figure out if we have previously save task info on this process intance. @@ -142,63 +122,6 @@ class TaskModelSavingDelegate(EngineStepDelegate): # return self.process_instance.bpmn_process_id is not None return True - def _update_json_data_dicts_using_list(self, json_data_dict_list: list[Optional[JsonDataDict]]) -> None: - for json_data_dict in json_data_dict_list: - if json_data_dict is not None: - self.json_data_dicts[json_data_dict["hash"]] = json_data_dict - - def _update_task_model_with_spiff_task(self, spiff_task: SpiffTask, task_failed: bool = False) -> TaskModel: - ( - bpmn_process, - task_model, - new_task_models, - new_json_data_dicts, - ) = TaskService.find_or_create_task_model_from_spiff_task( - spiff_task, - self.process_instance, - self.serializer, - bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, - ) - bpmn_process_json_data = TaskService.update_task_data_on_bpmn_process( - bpmn_process or task_model.bpmn_process, spiff_task.workflow.data - ) - # stp = False - # for ntm in new_task_models.values(): - # td = TaskDefinitionModel.query.filter_by(id=ntm.task_definition_id).first() - # if td.bpmn_identifier == 'Start': - # # import pdb; pdb.set_trace() - # stp = True - # print("HEY") - - # if stp: - # # import pdb; pdb.set_trace() - # print("HEY2") - self.task_models.update(new_task_models) - self.json_data_dicts.update(new_json_data_dicts) - json_data_dict_list = TaskService.update_task_model(task_model, spiff_task, self.serializer) - self.task_models[task_model.guid] = task_model - if bpmn_process_json_data is not None: - json_data_dict_list.append(bpmn_process_json_data) - self._update_json_data_dicts_using_list(json_data_dict_list) - - if task_model.state == "COMPLETED" or task_failed: - event_type = ProcessInstanceEventType.task_completed.value - if task_failed: - event_type = ProcessInstanceEventType.task_failed.value - - # FIXME: some failed tasks will currently not have either timestamp since we only hook into spiff when tasks complete - # which script tasks execute when READY. - timestamp = task_model.end_in_seconds or task_model.start_in_seconds or time.time() - process_instance_event = ProcessInstanceEventModel( - task_guid=task_model.guid, - process_instance_id=self.process_instance.id, - event_type=event_type, - timestamp=timestamp, - ) - self.process_instance_events[task_model.guid] = process_instance_event - - return task_model - class ExecutionStrategy: """Interface of sorts for a concrete execution strategy.""" diff --git a/spiffworkflow-backend/tests/data/manual_task_with_subprocesses/manual_task_with_subprocesses.bpmn b/spiffworkflow-backend/tests/data/manual_task_with_subprocesses/manual_task_with_subprocesses.bpmn index d2b1d94e5..e7817523c 100644 --- a/spiffworkflow-backend/tests/data/manual_task_with_subprocesses/manual_task_with_subprocesses.bpmn +++ b/spiffworkflow-backend/tests/data/manual_task_with_subprocesses/manual_task_with_subprocesses.bpmn @@ -4,7 +4,7 @@ Flow_0stlaxe - + Flow_1ygcsbt @@ -23,7 +23,7 @@ - + Flow_09gjylo Flow_0yxus36 @@ -46,7 +46,7 @@ except: we_move_on = False - + Flow_0yxus36 Flow_187mcqe @@ -60,7 +60,7 @@ except: we_move_on == True - + Flow_0lw7sda Flow_1ygcsbt set_top_level_process_script_after_gate = 1 @@ -78,30 +78,36 @@ except: - - - - - - - - - - - - + + + - - + + + + + + + + + + + + + + + + + diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index 0675394b5..01624597d 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -1,12 +1,12 @@ """Test_process_instance_processor.""" from uuid import UUID -import json import pytest from flask import g from flask.app import Flask from flask.testing import FlaskClient -from SpiffWorkflow.task import TaskState # type: ignore +from SpiffWorkflow.task import Task as SpiffTask # type: ignore +from SpiffWorkflow.task import TaskState from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.test_data import load_test_spec @@ -16,6 +16,7 @@ from spiffworkflow_backend.models.group import GroupModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus from spiffworkflow_backend.models.task import TaskModel # noqa: F401 +from spiffworkflow_backend.models.task_definition import TaskDefinitionModel from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.services.authorization_service import AuthorizationService from spiffworkflow_backend.services.authorization_service import ( @@ -297,6 +298,7 @@ class TestProcessInstanceProcessor(BaseTest): spiff_manual_task = processor.__class__.get_task_by_bpmn_identifier( human_task_one.task_name, processor.bpmn_process_instance ) + assert spiff_manual_task is not None processor.suspend() ProcessInstanceProcessor.reset_process(process_instance, str(spiff_manual_task.parent.id), commit=True) @@ -336,7 +338,7 @@ class TestProcessInstanceProcessor(BaseTest): ) processor = ProcessInstanceProcessor(process_instance) processor.do_engine_steps(save=True) - with open("before_reset.json", 'w') as f: f.write(json.dumps(processor.serialize(), indent=2)) + # with open("before_reset.json", 'w') as f: f.write(json.dumps(processor.serialize(), indent=2)) assert len(process_instance.active_human_tasks) == 1 initial_human_task_id = process_instance.active_human_tasks[0].id assert len(process_instance.active_human_tasks) == 1 @@ -346,20 +348,21 @@ class TestProcessInstanceProcessor(BaseTest): spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) - processor.suspend() ProcessInstanceProcessor.reset_process(process_instance, str(spiff_manual_task.id), commit=True) process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() processor = ProcessInstanceProcessor(process_instance) - with open("after_reset.json", 'w') as f: f.write(json.dumps(processor.serialize(), indent=2)) + # with open("after_reset.json", 'w') as f: f.write(json.dumps(processor.serialize(), indent=2)) processor.resume() processor.do_engine_steps(save=True) human_task_one = process_instance.active_human_tasks[0] spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) - import pdb; pdb.set_trace() + import pdb + + pdb.set_trace() assert process_instance.status == "complete" def test_properly_saves_tasks_when_running( @@ -409,6 +412,9 @@ class TestProcessInstanceProcessor(BaseTest): human_task_one = process_instance.active_human_tasks[0] spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) + human_task_one = process_instance.active_human_tasks[0] + spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) + ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) # recreate variables to ensure all bpmn json was recreated from scratch from the db process_instance_relookup = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() @@ -429,34 +435,74 @@ class TestProcessInstanceProcessor(BaseTest): }, } fourth_data_set = {**third_data_set, **{"a": 1, "we_move_on": True}} - fifth_data_set = {**fourth_data_set, **{"validate_only": False, "set_top_level_process_script_after_gate": 1}} + fifth_data_set = {**fourth_data_set, **{"set_top_level_process_script_after_gate": 1}} + sixth_data_set = {**fifth_data_set, **{"validate_only": False, "set_top_level_process_script_after_gate": 1}} expected_task_data = { - "top_level_script": first_data_set, - "manual_task": first_data_set, - "top_level_subprocess_script": second_data_set, - "top_level_subprocess": second_data_set, - "test_process_to_call_subprocess_script": third_data_set, - "top_level_call_activity": third_data_set, - "end_event_of_manual_task_model": third_data_set, - "top_level_subprocess_script_second": fourth_data_set, - "test_process_to_call_subprocess_script_second": fourth_data_set, + "top_level_script": {"data": first_data_set, "bpmn_process_identifier": "top_level_process"}, + "top_level_manual_task_one": {"data": first_data_set, "bpmn_process_identifier": "top_level_process"}, + "top_level_manual_task_two": {"data": first_data_set, "bpmn_process_identifier": "top_level_process"}, + "top_level_subprocess_script": { + "data": second_data_set, + "bpmn_process_identifier": "top_level_subprocess", + }, + "top_level_subprocess": {"data": second_data_set, "bpmn_process_identifier": "top_level_process"}, + "test_process_to_call_subprocess_script": { + "data": third_data_set, + "bpmn_process_identifier": "test_process_to_call_subprocess", + }, + "top_level_call_activity": {"data": third_data_set, "bpmn_process_identifier": "top_level_process"}, + "top_level_manual_task_two_second": { + "data": third_data_set, + "bpmn_process_identifier": "top_level_process", + }, + "top_level_subprocess_script_second": { + "data": fourth_data_set, + "bpmn_process_identifier": "top_level_subprocess", + }, + "top_level_subprocess_second": {"data": fourth_data_set, "bpmn_process_identifier": "top_level_process"}, + "test_process_to_call_subprocess_script_second": { + "data": fourth_data_set, + "bpmn_process_identifier": "test_process_to_call_subprocess", + }, + "top_level_call_activity_second": { + "data": fourth_data_set, + "bpmn_process_identifier": "top_level_process", + }, + "end_event_of_manual_task_model": {"data": fifth_data_set, "bpmn_process_identifier": "top_level_process"}, } - spiff_tasks_checked_once: list = [] + spiff_tasks_checked: list[str] = [] # TODO: also check task data here from the spiff_task directly to ensure we hydrated spiff correctly - def assert_spiff_task_is_in_process(spiff_task_identifier: str, bpmn_process_identifier: str) -> None: - if spiff_task.task_spec.name == spiff_task_identifier: - expected_task_data_key = spiff_task.task_spec.name - if spiff_task.task_spec.name in spiff_tasks_checked_once: + def assert_spiff_task_is_in_process(spiff_task: SpiffTask) -> None: + spiff_task_identifier = spiff_task.task_spec.name + if spiff_task_identifier in expected_task_data: + bpmn_process_identifier = expected_task_data[spiff_task_identifier]["bpmn_process_identifier"] + expected_task_data_key = spiff_task_identifier + if spiff_task_identifier in spiff_tasks_checked: expected_task_data_key = f"{spiff_task.task_spec.name}_second" - expected_python_env_data = expected_task_data[expected_task_data_key] + assert expected_task_data_key not in spiff_tasks_checked + + spiff_tasks_checked.append(expected_task_data_key) + + expected_python_env_data = expected_task_data[expected_task_data_key]["data"] base_failure_message = ( f"Failed on {bpmn_process_identifier} - {spiff_task_identifier} - task data key" f" {expected_task_data_key}." ) + + count_failure_message = ( + f"{base_failure_message} There are more than 2 entries of this task in the db." + " There should only ever be max 2." + ) + task_models_with_bpmn_identifier_count = ( + TaskModel.query.join(TaskDefinitionModel) + .filter(TaskDefinitionModel.bpmn_identifier == spiff_task.task_spec.name) + .count() + ) + assert task_models_with_bpmn_identifier_count < 3, count_failure_message task_model = TaskModel.query.filter_by(guid=str(spiff_task.id)).first() assert task_model.start_in_seconds is not None @@ -466,7 +512,9 @@ class TestProcessInstanceProcessor(BaseTest): task_definition = task_model.task_definition assert task_definition.bpmn_identifier == spiff_task_identifier assert task_definition.bpmn_name == spiff_task_identifier.replace("_", " ").title() - assert task_definition.bpmn_process_definition.bpmn_identifier == bpmn_process_identifier + assert ( + task_definition.bpmn_process_definition.bpmn_identifier == bpmn_process_identifier + ), base_failure_message message = ( f"{base_failure_message} Expected: {sorted(expected_python_env_data)}. Received:" @@ -474,18 +522,14 @@ class TestProcessInstanceProcessor(BaseTest): ) # TODO: if we split out env data again we will need to use it here instead of json_data # assert task_model.python_env_data() == expected_python_env_data, message + # import pdb; pdb.set_trace() assert task_model.json_data() == expected_python_env_data, message - spiff_tasks_checked_once.append(spiff_task.task_spec.name) all_spiff_tasks = processor_final.bpmn_process_instance.get_tasks() assert len(all_spiff_tasks) > 1 for spiff_task in all_spiff_tasks: assert spiff_task.state == TaskState.COMPLETED - assert_spiff_task_is_in_process( - "test_process_to_call_subprocess_script", "test_process_to_call_subprocess" - ) - assert_spiff_task_is_in_process("top_level_subprocess_script", "top_level_subprocess") - assert_spiff_task_is_in_process("top_level_script", "top_level_process") + assert_spiff_task_is_in_process(spiff_task) if spiff_task.task_spec.name == "top_level_call_activity": # the task id / guid of the call activity gets used as the guid of the bpmn process that it calls @@ -513,7 +557,14 @@ class TestProcessInstanceProcessor(BaseTest): assert direct_parent_process is not None assert direct_parent_process.bpmn_process_definition.bpmn_identifier == "test_process_to_call" - assert processor.get_data() == fifth_data_set + for task_bpmn_identifier in expected_task_data.keys(): + message = ( + f"Expected to have seen a task with a bpmn_identifier of {task_bpmn_identifier} but did not. " + f"Only saw {sorted(spiff_tasks_checked)}" + ) + assert task_bpmn_identifier in spiff_tasks_checked, message + + assert processor.get_data() == sixth_data_set def test_does_not_recreate_human_tasks_on_multiple_saves( self, From 3f365d462c45b07000ec7d1a8b96c173d2a8a789 Mon Sep 17 00:00:00 2001 From: jasquat Date: Thu, 30 Mar 2023 11:16:44 -0400 Subject: [PATCH 06/11] remove pdb w/ burnettk --- .../unit/test_process_instance_processor.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index 01624597d..e4db37323 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -360,9 +360,6 @@ class TestProcessInstanceProcessor(BaseTest): spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) - import pdb - - pdb.set_trace() assert process_instance.status == "complete" def test_properly_saves_tasks_when_running( From 7d1f01ee026fb3bc91b0b1c8cfc1a7453dec499c Mon Sep 17 00:00:00 2001 From: jasquat Date: Thu, 30 Mar 2023 12:41:42 -0400 Subject: [PATCH 07/11] do not save predicted tasks to the db w/ burnettk --- .../services/task_service.py | 36 ++++++++--- .../services/workflow_execution_service.py | 3 - .../unit/test_process_instance_processor.py | 60 +++++++++++-------- 3 files changed, 64 insertions(+), 35 deletions(-) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py index 671d415ca..1614815c1 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -68,12 +68,15 @@ class TaskService: spiff_task: SpiffTask, ) -> None: for child_spiff_task in spiff_task.children: - self.update_task_model_with_spiff_task( - spiff_task=child_spiff_task, - ) - self.process_spiff_task_children( - spiff_task=child_spiff_task, - ) + if child_spiff_task._has_state(TaskState.PREDICTED_MASK): + self.__class__.remove_spiff_task_from_parent(child_spiff_task, self.task_models) + else: + self.update_task_model_with_spiff_task( + spiff_task=child_spiff_task, + ) + self.process_spiff_task_children( + spiff_task=child_spiff_task, + ) def process_spiff_task_parents( self, @@ -137,7 +140,7 @@ class TaskService: ) self.process_instance_events[task_model.guid] = process_instance_event - # self.update_bpmn_process(spiff_task.workflow, bpmn_process) + self.update_bpmn_process(spiff_task.workflow, bpmn_process) return task_model def update_bpmn_process( @@ -315,7 +318,7 @@ class TaskService: if "subprocess_specs" in bpmn_process_dict: bpmn_process_dict.pop("subprocess_specs") - new_task_models = {} + new_task_models: dict[str, TaskModel] = {} new_json_data_dicts: dict[str, JsonDataDict] = {} bpmn_process = None @@ -386,7 +389,12 @@ class TaskService: if task_properties["task_spec"] == "Root": continue + # we are going to avoid saving likely and maybe tasks to the db. + # that means we need to remove them from their parents' lists of children as well. spiff_task = spiff_workflow.get_task_from_id(UUID(task_id)) + if spiff_task._has_state(TaskState.PREDICTED_MASK): + cls.remove_spiff_task_from_parent(spiff_task, new_task_models) + continue task_model = TaskModel.query.filter_by(guid=task_id).first() if task_model is None: @@ -406,6 +414,18 @@ class TaskService: new_json_data_dicts[python_env_dict["hash"]] = python_env_dict return (bpmn_process, new_task_models, new_json_data_dicts) + @classmethod + def remove_spiff_task_from_parent(cls, spiff_task: SpiffTask, task_models: dict[str, TaskModel]) -> None: + """Removes the given spiff task from its parent and then updates the task_models dict with the changes.""" + spiff_task_parent_guid = str(spiff_task.parent.id) + spiff_task_guid = str(spiff_task.id) + if spiff_task_parent_guid in task_models: + parent_task_model = task_models[spiff_task_parent_guid] + new_parent_properties_json = copy.copy(parent_task_model.properties_json) + new_parent_properties_json["children"].remove(spiff_task_guid) + parent_task_model.properties_json = new_parent_properties_json + task_models[spiff_task_parent_guid] = parent_task_model + @classmethod def update_task_data_on_bpmn_process( cls, bpmn_process: BpmnProcessModel, bpmn_process_data_dict: dict diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 64e197ef2..3fd433e59 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -61,9 +61,6 @@ class TaskModelSavingDelegate(EngineStepDelegate): self.current_task_model: Optional[TaskModel] = None self.current_task_start_in_seconds: Optional[float] = None - # self.task_models: dict[str, TaskModel] = {} - # self.json_data_dicts: dict[str, JsonDataDict] = {} - # self.process_instance_events: dict[str, ProcessInstanceEventModel] = {} self.last_completed_spiff_task: Optional[SpiffTask] = None self.task_service = TaskService( diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index e4db37323..c07f2c790 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -418,54 +418,59 @@ class TestProcessInstanceProcessor(BaseTest): processor_final = ProcessInstanceProcessor(process_instance_relookup) assert process_instance_relookup.status == "complete" - first_data_set = {"set_in_top_level_script": 1} - second_data_set = { - **first_data_set, + data_set_1 = {"set_in_top_level_script": 1} + data_set_2 = { + **data_set_1, **{"set_in_top_level_subprocess": 1, "we_move_on": False}, } - third_data_set = { - **second_data_set, + data_set_3 = { + **data_set_2, **{ - "set_in_test_process_to_call_script": 1, "set_in_test_process_to_call_subprocess_subprocess_script": 1, "set_in_test_process_to_call_subprocess_script": 1, }, } - fourth_data_set = {**third_data_set, **{"a": 1, "we_move_on": True}} - fifth_data_set = {**fourth_data_set, **{"set_top_level_process_script_after_gate": 1}} - sixth_data_set = {**fifth_data_set, **{"validate_only": False, "set_top_level_process_script_after_gate": 1}} + data_set_4 = { + **data_set_3, + **{ + "set_in_test_process_to_call_script": 1, + }, + } + data_set_5 = {**data_set_4, **{"a": 1, "we_move_on": True}} + data_set_6 = {**data_set_5, **{"set_top_level_process_script_after_gate": 1}} + data_set_7 = {**data_set_6, **{"validate_only": False, "set_top_level_process_script_after_gate": 1}} expected_task_data = { - "top_level_script": {"data": first_data_set, "bpmn_process_identifier": "top_level_process"}, - "top_level_manual_task_one": {"data": first_data_set, "bpmn_process_identifier": "top_level_process"}, - "top_level_manual_task_two": {"data": first_data_set, "bpmn_process_identifier": "top_level_process"}, + "top_level_script": {"data": data_set_1, "bpmn_process_identifier": "top_level_process"}, + "top_level_manual_task_one": {"data": data_set_1, "bpmn_process_identifier": "top_level_process"}, + "top_level_manual_task_two": {"data": data_set_1, "bpmn_process_identifier": "top_level_process"}, "top_level_subprocess_script": { - "data": second_data_set, + "data": data_set_2, "bpmn_process_identifier": "top_level_subprocess", }, - "top_level_subprocess": {"data": second_data_set, "bpmn_process_identifier": "top_level_process"}, + "top_level_subprocess": {"data": data_set_2, "bpmn_process_identifier": "top_level_process"}, "test_process_to_call_subprocess_script": { - "data": third_data_set, + "data": data_set_3, "bpmn_process_identifier": "test_process_to_call_subprocess", }, - "top_level_call_activity": {"data": third_data_set, "bpmn_process_identifier": "top_level_process"}, + "top_level_call_activity": {"data": data_set_4, "bpmn_process_identifier": "top_level_process"}, "top_level_manual_task_two_second": { - "data": third_data_set, + "data": data_set_4, "bpmn_process_identifier": "top_level_process", }, "top_level_subprocess_script_second": { - "data": fourth_data_set, + "data": data_set_5, "bpmn_process_identifier": "top_level_subprocess", }, - "top_level_subprocess_second": {"data": fourth_data_set, "bpmn_process_identifier": "top_level_process"}, + "top_level_subprocess_second": {"data": data_set_5, "bpmn_process_identifier": "top_level_process"}, "test_process_to_call_subprocess_script_second": { - "data": fourth_data_set, + "data": data_set_5, "bpmn_process_identifier": "test_process_to_call_subprocess", }, "top_level_call_activity_second": { - "data": fourth_data_set, + "data": data_set_5, "bpmn_process_identifier": "top_level_process", }, - "end_event_of_manual_task_model": {"data": fifth_data_set, "bpmn_process_identifier": "top_level_process"}, + "end_event_of_manual_task_model": {"data": data_set_6, "bpmn_process_identifier": "top_level_process"}, } spiff_tasks_checked: list[str] = [] @@ -496,6 +501,7 @@ class TestProcessInstanceProcessor(BaseTest): ) task_models_with_bpmn_identifier_count = ( TaskModel.query.join(TaskDefinitionModel) + .filter(TaskModel.process_instance_id == process_instance_relookup.id) .filter(TaskDefinitionModel.bpmn_identifier == spiff_task.task_spec.name) .count() ) @@ -519,7 +525,6 @@ class TestProcessInstanceProcessor(BaseTest): ) # TODO: if we split out env data again we will need to use it here instead of json_data # assert task_model.python_env_data() == expected_python_env_data, message - # import pdb; pdb.set_trace() assert task_model.json_data() == expected_python_env_data, message all_spiff_tasks = processor_final.bpmn_process_instance.get_tasks() @@ -561,7 +566,14 @@ class TestProcessInstanceProcessor(BaseTest): ) assert task_bpmn_identifier in spiff_tasks_checked, message - assert processor.get_data() == sixth_data_set + task_models_that_are_predicted_count = ( + TaskModel.query.filter(TaskModel.process_instance_id == process_instance_relookup.id) + .filter(TaskModel.state.in_(["LIKELY", "MAYBE"])) # type: ignore + .count() + ) + assert task_models_that_are_predicted_count == 0 + + assert processor.get_data() == data_set_7 def test_does_not_recreate_human_tasks_on_multiple_saves( self, From c5806ee53d8f6543d554a4512a62cbab0adcf5fd Mon Sep 17 00:00:00 2001 From: jasquat Date: Thu, 30 Mar 2023 15:25:44 -0400 Subject: [PATCH 08/11] fixed some failing tests except for test_send_event --- .../services/task_service.py | 15 +++++------ .../services/workflow_execution_service.py | 7 +++-- .../tests/data/manual_task/manual_task.bpmn | 8 +++--- .../integration/test_process_api.py | 26 +++++-------------- .../unit/test_process_instance_processor.py | 14 ++++++++-- 5 files changed, 33 insertions(+), 37 deletions(-) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py index 1614815c1..fef8265fb 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -70,13 +70,13 @@ class TaskService: for child_spiff_task in spiff_task.children: if child_spiff_task._has_state(TaskState.PREDICTED_MASK): self.__class__.remove_spiff_task_from_parent(child_spiff_task, self.task_models) - else: - self.update_task_model_with_spiff_task( - spiff_task=child_spiff_task, - ) - self.process_spiff_task_children( - spiff_task=child_spiff_task, - ) + continue + self.update_task_model_with_spiff_task( + spiff_task=child_spiff_task, + ) + self.process_spiff_task_children( + spiff_task=child_spiff_task, + ) def process_spiff_task_parents( self, @@ -148,7 +148,6 @@ class TaskService: spiff_workflow: BpmnWorkflow, bpmn_process: BpmnProcessModel, ) -> None: - # import pdb; pdb.set_trace() new_properties_json = copy.copy(bpmn_process.properties_json) new_properties_json["last_task"] = str(spiff_workflow.last_task) if spiff_workflow.last_task else None new_properties_json["success"] = spiff_workflow.success diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 3fd433e59..43927b830 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -126,13 +126,12 @@ class ExecutionStrategy: def __init__(self, delegate: EngineStepDelegate): """__init__.""" self.delegate = delegate - self.bpmn_process_instance = None def do_engine_steps(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None: pass - def save(self) -> None: - self.delegate.save(self.bpmn_process_instance) + def save(self, bpmn_process_instance: BpmnWorkflow) -> None: + self.delegate.save(bpmn_process_instance) class GreedyExecutionStrategy(ExecutionStrategy): @@ -238,7 +237,7 @@ class WorkflowExecutionService: raise ApiError.from_workflow_exception("task_error", str(swe), swe) from swe finally: - self.execution_strategy.save() + self.execution_strategy.save(self.bpmn_process_instance) db.session.commit() if save: diff --git a/spiffworkflow-backend/tests/data/manual_task/manual_task.bpmn b/spiffworkflow-backend/tests/data/manual_task/manual_task.bpmn index f4d0190bc..ac1486e4b 100644 --- a/spiffworkflow-backend/tests/data/manual_task/manual_task.bpmn +++ b/spiffworkflow-backend/tests/data/manual_task/manual_task.bpmn @@ -7,8 +7,8 @@ Flow_0nnh2x9 - - + + ## Hello @@ -16,7 +16,7 @@ Flow_0nnh2x9 - + Flow_0stlaxe Flow_1pmem7s @@ -31,7 +31,7 @@ - + diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py index 89fda503d..c5623f47b 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py @@ -2616,6 +2616,8 @@ class TestProcessApi(BaseTest): content_type="application/json", data=json.dumps(data), ) + assert response.status_code == 200 + assert response.json is not None assert response.json["status"] == "complete" response = client.get( @@ -2641,9 +2643,9 @@ class TestProcessApi(BaseTest): ) -> None: """Test_script_unit_test_run.""" process_group_id = "test_group" - process_model_id = "process_navigation" - bpmn_file_name = "process_navigation.bpmn" - bpmn_file_location = "process_navigation" + process_model_id = "manual_task" + bpmn_file_name = "manual_task.bpmn" + bpmn_file_location = "manual_task" process_model_identifier = self.create_group_and_model_with_bpmn( client=client, user=with_super_admin_user, @@ -2674,25 +2676,11 @@ class TestProcessApi(BaseTest): headers=self.logged_in_headers(with_super_admin_user), ) - data = { - "dateTime": "PT1H", - "external": True, - "internal": True, - "label": "Event_0e4owa3", - "typename": "TimerEventDefinition", - } - response = client.post( - f"/v1.0/send-event/{self.modify_process_identifier_for_path_param(process_model_identifier)}/{process_instance_id}", - headers=self.logged_in_headers(with_super_admin_user), - content_type="application/json", - data=json.dumps(data), - ) - response = client.get( f"/v1.0/process-instances/{self.modify_process_identifier_for_path_param(process_model_identifier)}/{process_instance_id}/task-info", headers=self.logged_in_headers(with_super_admin_user), ) - assert len(response.json) == 9 + assert len(response.json) == 7 human_task = next(task for task in response.json if task["bpmn_identifier"] == "manual_task_one") response = client.post( @@ -2711,7 +2699,7 @@ class TestProcessApi(BaseTest): headers=self.logged_in_headers(with_super_admin_user), ) assert response.status_code == 200 - assert len(response.json) == 9 + assert len(response.json) == 7 def setup_initial_groups_for_move_tests(self, client: FlaskClient, with_super_admin_user: UserModel) -> None: """Setup_initial_groups_for_move_tests.""" diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index c07f2c790..13a22cedf 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -353,14 +353,24 @@ class TestProcessInstanceProcessor(BaseTest): process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() processor = ProcessInstanceProcessor(process_instance) - # with open("after_reset.json", 'w') as f: f.write(json.dumps(processor.serialize(), indent=2)) processor.resume() processor.do_engine_steps(save=True) human_task_one = process_instance.active_human_tasks[0] spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) - assert process_instance.status == "complete" + process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() + processor = ProcessInstanceProcessor(process_instance) + human_task_one = process_instance.active_human_tasks[0] + spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) + ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) + human_task_one = process_instance.active_human_tasks[0] + spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) + ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) + + # recreate variables to ensure all bpmn json was recreated from scratch from the db + process_instance_relookup = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() + assert process_instance_relookup.status == "complete" def test_properly_saves_tasks_when_running( self, From 5754f44c259026d59b1feaf4292a01c22a5eed60 Mon Sep 17 00:00:00 2001 From: jasquat Date: Thu, 30 Mar 2023 16:30:34 -0400 Subject: [PATCH 09/11] WIP trying to get resetting to a task within a subprocess working w/ burnettk --- .../services/process_instance_processor.py | 1 + .../services/workflow_execution_service.py | 3 ++ .../integration/test_process_api.py | 1 + .../unit/test_process_instance_processor.py | 32 +++++++++++-------- 4 files changed, 24 insertions(+), 13 deletions(-) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 1c32efb80..e572c5ae4 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1348,6 +1348,7 @@ class ProcessInstanceProcessor: for task_to_update in tasks_to_update: # print(f"task_to_update: {task_to_update}") + print(f"task_to_update.state: {task_to_update.state}") TaskService.reset_task_model(task_to_update, state="FUTURE", commit=commit) # TaskService.reset_task_model(task_to_update, state=task_to_update.state, commit=commit) # if task_to_update.task_definition.bpmn_identifier != 'top_level_process_script_after_gate': diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 43927b830..495ab3107 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -72,6 +72,8 @@ class TaskModelSavingDelegate(EngineStepDelegate): def will_complete_task(self, spiff_task: SpiffTask) -> None: if self._should_update_task_model(): self.current_task_start_in_seconds = time.time() + # import pdb; pdb.set_trace() + spiff_task.task_spec._predict(spiff_task, mask=TaskState.NOT_FINISHED_MASK) if self.secondary_engine_step_delegate: self.secondary_engine_step_delegate.will_complete_task(spiff_task) @@ -108,6 +110,7 @@ class TaskModelSavingDelegate(EngineStepDelegate): # ): # self._update_task_model_with_spiff_task(waiting_spiff_task) if self.last_completed_spiff_task is not None: + import pdb; pdb.set_trace() self.task_service.process_spiff_task_parents(self.last_completed_spiff_task) self.task_service.process_spiff_task_children(self.last_completed_spiff_task) diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py index c5623f47b..84d970bda 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py @@ -2618,6 +2618,7 @@ class TestProcessApi(BaseTest): ) assert response.status_code == 200 assert response.json is not None + import pdb; pdb.set_trace() assert response.json["status"] == "complete" response = client.get( diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index 13a22cedf..3d949388e 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -347,30 +347,36 @@ class TestProcessInstanceProcessor(BaseTest): human_task_one = process_instance.active_human_tasks[0] spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) + human_task_one = process_instance.active_human_tasks[0] + spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) + ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) + + ### NOTES: + # somehow we are hosing the task state so that when completing tasks of a subprocess, the task AFTER the subprocess task + # is not marked READY but instead stays as FUTURE. Running things like: + # self.last_completed_spiff_task.task_spec._update(self.last_completed_spiff_task) + # and + # self.last_completed_spiff_task.task_spec._predict(self.last_completed_spiff_task, mask=TaskState.NOT_FINISHED_MASK) + # did not help. processor.suspend() - ProcessInstanceProcessor.reset_process(process_instance, str(spiff_manual_task.id), commit=True) + # import pdb; pdb.set_trace() + task_model_to_reset_to = TaskModel.query.join(TaskDefinitionModel).filter(TaskDefinitionModel.bpmn_identifier == 'top_level_subprocess_script').order_by(TaskModel.id.desc()).first() + assert task_model_to_reset_to is not None + ProcessInstanceProcessor.reset_process(process_instance, task_model_to_reset_to.guid, commit=True) + # import pdb; pdb.set_trace() process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() processor = ProcessInstanceProcessor(process_instance) processor.resume() processor.do_engine_steps(save=True) + import pdb; pdb.set_trace() + assert len(process_instance.active_human_tasks) == 1 human_task_one = process_instance.active_human_tasks[0] spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) - process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() - processor = ProcessInstanceProcessor(process_instance) - human_task_one = process_instance.active_human_tasks[0] - spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) - ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) - human_task_one = process_instance.active_human_tasks[0] - spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) - ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) - - # recreate variables to ensure all bpmn json was recreated from scratch from the db - process_instance_relookup = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() - assert process_instance_relookup.status == "complete" + assert process_instance.status == "complete" def test_properly_saves_tasks_when_running( self, From be5bf319744e665063d21fb5cef0e7d718da7044 Mon Sep 17 00:00:00 2001 From: jasquat Date: Fri, 31 Mar 2023 10:48:16 -0400 Subject: [PATCH 10/11] added test for loopback to subprocess and fixed issue w/ burnettk --- .../services/process_instance_processor.py | 5 + .../services/task_service.py | 26 ++-- .../services/workflow_execution_service.py | 4 +- .../loopback_to_subprocess.bpmn | 116 ++++++++++++++++++ .../unit/test_process_instance_processor.py | 46 ++++++- 5 files changed, 180 insertions(+), 17 deletions(-) create mode 100644 spiffworkflow-backend/tests/data/loopback_to_subprocess/loopback_to_subprocess.bpmn diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 1fff87c15..a5bab8afd 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1897,6 +1897,11 @@ class ProcessInstanceProcessor: all_tasks = self.bpmn_process_instance.get_tasks(TaskState.ANY_MASK) return [t for t in all_tasks if t.state in [TaskState.WAITING, TaskState.READY]] + def get_task_by_guid( + self, task_guid: str + ) -> Optional[SpiffTask]: + return self.bpmn_process_instance.get_task_from_id(UUID(task_guid)) + @classmethod def get_task_by_bpmn_identifier( cls, bpmn_task_identifier: str, bpmn_process_instance: BpmnWorkflow diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py index fef8265fb..e9839fa74 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -60,7 +60,7 @@ class TaskService: spiff_task: SpiffTask, ) -> None: self.process_spiff_task_children(spiff_task) - self.process_spiff_task_parents(spiff_task) + self.process_spiff_task_parent_subprocess_tasks(spiff_task) self.save_objects_to_database() def process_spiff_task_children( @@ -68,9 +68,9 @@ class TaskService: spiff_task: SpiffTask, ) -> None: for child_spiff_task in spiff_task.children: - if child_spiff_task._has_state(TaskState.PREDICTED_MASK): - self.__class__.remove_spiff_task_from_parent(child_spiff_task, self.task_models) - continue + # if child_spiff_task._has_state(TaskState.PREDICTED_MASK): + # self.__class__.remove_spiff_task_from_parent(child_spiff_task, self.task_models) + # continue self.update_task_model_with_spiff_task( spiff_task=child_spiff_task, ) @@ -78,10 +78,15 @@ class TaskService: spiff_task=child_spiff_task, ) - def process_spiff_task_parents( + def process_spiff_task_parent_subprocess_tasks( self, spiff_task: SpiffTask, ) -> None: + """Find the parent subprocess of a given spiff_task and update its data. + + This will also process that subprocess task's children and will recurse upwards + to process its parent subprocesses as well. + """ (parent_subprocess_guid, _parent_subprocess) = self.__class__.task_subprocess(spiff_task) if parent_subprocess_guid is not None: spiff_task_of_parent_subprocess = spiff_task.workflow._get_outermost_workflow().get_task_from_id( @@ -92,7 +97,10 @@ class TaskService: self.update_task_model_with_spiff_task( spiff_task=spiff_task_of_parent_subprocess, ) - self.process_spiff_task_parents( + self.process_spiff_task_children( + spiff_task=spiff_task_of_parent_subprocess, + ) + self.process_spiff_task_parent_subprocess_tasks( spiff_task=spiff_task_of_parent_subprocess, ) @@ -391,9 +399,9 @@ class TaskService: # we are going to avoid saving likely and maybe tasks to the db. # that means we need to remove them from their parents' lists of children as well. spiff_task = spiff_workflow.get_task_from_id(UUID(task_id)) - if spiff_task._has_state(TaskState.PREDICTED_MASK): - cls.remove_spiff_task_from_parent(spiff_task, new_task_models) - continue + # if spiff_task._has_state(TaskState.PREDICTED_MASK): + # cls.remove_spiff_task_from_parent(spiff_task, new_task_models) + # continue task_model = TaskModel.query.filter_by(guid=task_id).first() if task_model is None: diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 495ab3107..310286e76 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -110,8 +110,8 @@ class TaskModelSavingDelegate(EngineStepDelegate): # ): # self._update_task_model_with_spiff_task(waiting_spiff_task) if self.last_completed_spiff_task is not None: - import pdb; pdb.set_trace() - self.task_service.process_spiff_task_parents(self.last_completed_spiff_task) + # import pdb; pdb.set_trace() + self.task_service.process_spiff_task_parent_subprocess_tasks(self.last_completed_spiff_task) self.task_service.process_spiff_task_children(self.last_completed_spiff_task) def _should_update_task_model(self) -> bool: diff --git a/spiffworkflow-backend/tests/data/loopback_to_subprocess/loopback_to_subprocess.bpmn b/spiffworkflow-backend/tests/data/loopback_to_subprocess/loopback_to_subprocess.bpmn new file mode 100644 index 000000000..eff8cd2f5 --- /dev/null +++ b/spiffworkflow-backend/tests/data/loopback_to_subprocess/loopback_to_subprocess.bpmn @@ -0,0 +1,116 @@ + + + + + Flow_1dk6oyl + + + Flow_0s9lss3 + Flow_02xy1ag + Flow_11uu31d + + + + Flow_0sw85uk + Flow_0s9lss3 + x=1 + + + Flow_02xy1ag + + + x==2 + + + + + Flow_1dk6oyl + Flow_11uu31d + Flow_0sw85uk + + Flow_0ih1i19 + + + + Flow_0dua5j8 + + + + + HEY MANUAL + + Flow_0ih1i19 + Flow_0dua5j8 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index 3d949388e..34c71e7cb 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -696,14 +696,48 @@ class TestProcessInstanceProcessor(BaseTest): assert len(process_instance.human_tasks) == 2 human_task_two = process_instance.active_human_tasks[0] - # this is just asserting the way the functionality currently works in spiff. - # we would actually expect this to change one day if we stop reusing the same guid - # when we re-do a task. - # assert human_task_two.task_id == human_task_one.task_id - - # EDIT: when using feature/remove-loop-reset branch of SpiffWorkflow, these should be different. assert human_task_two.task_id != human_task_one.task_id + def test_it_can_loopback_to_previous_bpmn_subprocess_with_gateway( + self, + app: Flask, + client: FlaskClient, + with_db_and_bpmn_file_cleanup: None, + ) -> None: + initiator_user = self.find_or_create_user("initiator_user") + process_model = load_test_spec( + process_model_id="test_group/loopback_to_subprocess", + process_model_source_directory="loopback_to_subprocess", + ) + process_instance = self.create_process_instance_from_process_model( + process_model=process_model, user=initiator_user + ) + processor = ProcessInstanceProcessor(process_instance) + processor.do_engine_steps(save=True) + + assert len(process_instance.active_human_tasks) == 1 + assert len(process_instance.human_tasks) == 1 + human_task_one = process_instance.active_human_tasks[0] + + spiff_task = processor.get_task_by_guid(human_task_one.task_id) + ProcessInstanceService.complete_form_task(processor, spiff_task, {}, initiator_user, human_task_one) + + processor = ProcessInstanceProcessor(process_instance) + assert len(process_instance.active_human_tasks) == 1 + assert len(process_instance.human_tasks) == 2 + human_task_two = process_instance.active_human_tasks[0] + spiff_task = processor.get_task_by_guid(human_task_two.task_id) + ProcessInstanceService.complete_form_task(processor, spiff_task, {}, initiator_user, human_task_two) + + import pdb; pdb.set_trace() + # ensure this does not raise a KeyError + processor = ProcessInstanceProcessor(process_instance) + assert len(process_instance.active_human_tasks) == 1 + assert len(process_instance.human_tasks) == 3 + human_task_three = process_instance.active_human_tasks[0] + spiff_task = processor.get_task_by_guid(human_task_three.task_id) + ProcessInstanceService.complete_form_task(processor, spiff_task, {}, initiator_user, human_task_three) + def test_task_data_is_set_even_if_process_instance_errors( self, app: Flask, From 783faa7ce93797d8341ba7fec22831f4f9e38854 Mon Sep 17 00:00:00 2001 From: jasquat Date: Fri, 31 Mar 2023 10:57:13 -0400 Subject: [PATCH 11/11] some cleanup before merging to main w/ burnettk --- .../services/process_instance_processor.py | 241 ++++++++------- .../services/workflow_execution_service.py | 2 - .../integration/test_process_api.py | 1 - .../unit/test_process_instance_processor.py | 279 +++++++++--------- .../src/routes/ProcessInstanceShow.tsx | 20 +- 5 files changed, 269 insertions(+), 274 deletions(-) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index a5bab8afd..93cd64fb2 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1,6 +1,5 @@ """Process_instance_processor.""" import _strptime # type: ignore -import copy import decimal import json import logging @@ -51,8 +50,6 @@ from SpiffWorkflow.spiff.serializer.config import SPIFF_SPEC_CONFIG # type: ign from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import TaskState from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore -from sqlalchemy import and_ -from sqlalchemy import or_ from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel @@ -1266,123 +1263,123 @@ class ProcessInstanceProcessor: cls, process_instance: ProcessInstanceModel, to_task_guid: str, commit: Optional[bool] = False ) -> None: """Reset a process to an earlier state.""" - # raise Exception("This feature to reset a process instance to a given task is currently unavaiable") - cls.add_event_to_process_instance( - process_instance, ProcessInstanceEventType.process_instance_rewound_to_task.value, task_guid=to_task_guid - ) - - to_task_model = TaskModel.query.filter_by(guid=to_task_guid, process_instance_id=process_instance.id).first() - if to_task_model is None: - raise TaskNotFoundError( - f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'" - ) - - parent_bpmn_processes, task_models_of_parent_bpmn_processes = TaskService.task_models_of_parent_bpmn_processes( - to_task_model - ) - [p.guid for p in task_models_of_parent_bpmn_processes if p.guid] - [p.id for p in parent_bpmn_processes] - tasks_to_update_query = db.session.query(TaskModel).filter( - and_( - or_( - TaskModel.end_in_seconds > to_task_model.end_in_seconds, - TaskModel.end_in_seconds.is_(None), # type: ignore - ), - TaskModel.process_instance_id == process_instance.id, - # TaskModel.bpmn_process_id.in_(parent_bpmn_processes_ids), # type: ignore - ) - ) - tasks_to_update = tasks_to_update_query.all() - - # run all queries before making changes to task_model - if commit: - # tasks_to_delete_query = db.session.query(TaskModel).filter( - # and_( - # or_( - # TaskModel.end_in_seconds > to_task_model.end_in_seconds, - # TaskModel.end_in_seconds.is_not(None), # type: ignore - # ), - # TaskModel.process_instance_id == process_instance.id, - # TaskModel.guid.not_in(task_models_of_parent_bpmn_processes_guids), # type: ignore - # TaskModel.bpmn_process_id.not_in(parent_bpmn_processes_ids), # type: ignore - # ) - # ) - # - # tasks_to_delete = tasks_to_delete_query.all() - # - # # delete any later tasks from to_task_model and delete bpmn processes that may be - # # link directly to one of those tasks. - # tasks_to_delete_guids = [t.guid for t in tasks_to_delete] - # tasks_to_delete_ids = [t.id for t in tasks_to_delete] - # bpmn_processes_to_delete = BpmnProcessModel.query.filter( - # BpmnProcessModel.guid.in_(tasks_to_delete_guids) # type: ignore - # ).order_by(BpmnProcessModel.id.desc()).all() - # human_tasks_to_delete = HumanTaskModel.query.filter( - # HumanTaskModel.task_model_id.in_(tasks_to_delete_ids) # type: ignore - # ).all() - # - # - # import pdb; pdb.set_trace() - # # ensure the correct order for foreign keys - # for human_task_to_delete in human_tasks_to_delete: - # db.session.delete(human_task_to_delete) - # db.session.commit() - # for task_to_delete in tasks_to_delete: - # db.session.delete(task_to_delete) - # db.session.commit() - # for bpmn_process_to_delete in bpmn_processes_to_delete: - # db.session.delete(bpmn_process_to_delete) - # db.session.commit() - - related_human_task = HumanTaskModel.query.filter_by(task_model_id=to_task_model.id).first() - if related_human_task is not None: - db.session.delete(related_human_task) - - tasks_to_update_ids = [t.id for t in tasks_to_update] - human_tasks_to_delete = HumanTaskModel.query.filter( - HumanTaskModel.task_model_id.in_(tasks_to_update_ids) # type: ignore - ).all() - for human_task_to_delete in human_tasks_to_delete: - db.session.delete(human_task_to_delete) - db.session.commit() - - for task_to_update in tasks_to_update: - # print(f"task_to_update: {task_to_update}") - print(f"task_to_update.state: {task_to_update.state}") - TaskService.reset_task_model(task_to_update, state="FUTURE", commit=commit) - # TaskService.reset_task_model(task_to_update, state=task_to_update.state, commit=commit) - # if task_to_update.task_definition.bpmn_identifier != 'top_level_process_script_after_gate': - # TaskService.reset_task_model(task_to_update, state='FUTURE', commit=commit) - # else: - # TaskService.reset_task_model(task_to_update, state=task_to_update.state, commit=commit) - - parent_task_model = TaskModel.query.filter_by(guid=to_task_model.properties_json["parent"]).first() - if parent_task_model is None: - raise TaskNotFoundError( - f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'" - ) - - TaskService.reset_task_model( - to_task_model, - state="READY", - json_data_hash=parent_task_model.json_data_hash, - python_env_data_hash=parent_task_model.python_env_data_hash, - commit=commit, - ) - for task_model in task_models_of_parent_bpmn_processes: - TaskService.reset_task_model(task_model, state="WAITING", commit=commit) - - bpmn_process = to_task_model.bpmn_process - properties_json = copy.copy(bpmn_process.properties_json) - properties_json["last_task"] = parent_task_model.guid - bpmn_process.properties_json = properties_json - db.session.add(bpmn_process) - db.session.commit() - - if commit: - processor = ProcessInstanceProcessor(process_instance) - processor.save() - processor.suspend() + raise Exception("This feature to reset a process instance to a given task is currently unavaiable") + # cls.add_event_to_process_instance( + # process_instance, ProcessInstanceEventType.process_instance_rewound_to_task.value, task_guid=to_task_guid + # ) + # + # to_task_model = TaskModel.query.filter_by(guid=to_task_guid, process_instance_id=process_instance.id).first() + # if to_task_model is None: + # raise TaskNotFoundError( + # f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'" + # ) + # + # parent_bpmn_processes, task_models_of_parent_bpmn_processes = TaskService.task_models_of_parent_bpmn_processes( + # to_task_model + # ) + # [p.guid for p in task_models_of_parent_bpmn_processes if p.guid] + # [p.id for p in parent_bpmn_processes] + # tasks_to_update_query = db.session.query(TaskModel).filter( + # and_( + # or_( + # TaskModel.end_in_seconds > to_task_model.end_in_seconds, + # TaskModel.end_in_seconds.is_(None), # type: ignore + # ), + # TaskModel.process_instance_id == process_instance.id, + # # TaskModel.bpmn_process_id.in_(parent_bpmn_processes_ids), # type: ignore + # ) + # ) + # tasks_to_update = tasks_to_update_query.all() + # + # # run all queries before making changes to task_model + # if commit: + # # tasks_to_delete_query = db.session.query(TaskModel).filter( + # # and_( + # # or_( + # # TaskModel.end_in_seconds > to_task_model.end_in_seconds, + # # TaskModel.end_in_seconds.is_not(None), # type: ignore + # # ), + # # TaskModel.process_instance_id == process_instance.id, + # # TaskModel.guid.not_in(task_models_of_parent_bpmn_processes_guids), # type: ignore + # # TaskModel.bpmn_process_id.not_in(parent_bpmn_processes_ids), # type: ignore + # # ) + # # ) + # # + # # tasks_to_delete = tasks_to_delete_query.all() + # # + # # # delete any later tasks from to_task_model and delete bpmn processes that may be + # # # link directly to one of those tasks. + # # tasks_to_delete_guids = [t.guid for t in tasks_to_delete] + # # tasks_to_delete_ids = [t.id for t in tasks_to_delete] + # # bpmn_processes_to_delete = BpmnProcessModel.query.filter( + # # BpmnProcessModel.guid.in_(tasks_to_delete_guids) # type: ignore + # # ).order_by(BpmnProcessModel.id.desc()).all() + # # human_tasks_to_delete = HumanTaskModel.query.filter( + # # HumanTaskModel.task_model_id.in_(tasks_to_delete_ids) # type: ignore + # # ).all() + # # + # # + # # import pdb; pdb.set_trace() + # # # ensure the correct order for foreign keys + # # for human_task_to_delete in human_tasks_to_delete: + # # db.session.delete(human_task_to_delete) + # # db.session.commit() + # # for task_to_delete in tasks_to_delete: + # # db.session.delete(task_to_delete) + # # db.session.commit() + # # for bpmn_process_to_delete in bpmn_processes_to_delete: + # # db.session.delete(bpmn_process_to_delete) + # # db.session.commit() + # + # related_human_task = HumanTaskModel.query.filter_by(task_model_id=to_task_model.id).first() + # if related_human_task is not None: + # db.session.delete(related_human_task) + # + # tasks_to_update_ids = [t.id for t in tasks_to_update] + # human_tasks_to_delete = HumanTaskModel.query.filter( + # HumanTaskModel.task_model_id.in_(tasks_to_update_ids) # type: ignore + # ).all() + # for human_task_to_delete in human_tasks_to_delete: + # db.session.delete(human_task_to_delete) + # db.session.commit() + # + # for task_to_update in tasks_to_update: + # # print(f"task_to_update: {task_to_update}") + # print(f"task_to_update.state: {task_to_update.state}") + # TaskService.reset_task_model(task_to_update, state="FUTURE", commit=commit) + # # TaskService.reset_task_model(task_to_update, state=task_to_update.state, commit=commit) + # # if task_to_update.task_definition.bpmn_identifier != 'top_level_process_script_after_gate': + # # TaskService.reset_task_model(task_to_update, state='FUTURE', commit=commit) + # # else: + # # TaskService.reset_task_model(task_to_update, state=task_to_update.state, commit=commit) + # + # parent_task_model = TaskModel.query.filter_by(guid=to_task_model.properties_json["parent"]).first() + # if parent_task_model is None: + # raise TaskNotFoundError( + # f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'" + # ) + # + # TaskService.reset_task_model( + # to_task_model, + # state="READY", + # json_data_hash=parent_task_model.json_data_hash, + # python_env_data_hash=parent_task_model.python_env_data_hash, + # commit=commit, + # ) + # for task_model in task_models_of_parent_bpmn_processes: + # TaskService.reset_task_model(task_model, state="WAITING", commit=commit) + # + # bpmn_process = to_task_model.bpmn_process + # properties_json = copy.copy(bpmn_process.properties_json) + # properties_json["last_task"] = parent_task_model.guid + # bpmn_process.properties_json = properties_json + # db.session.add(bpmn_process) + # db.session.commit() + # + # if commit: + # processor = ProcessInstanceProcessor(process_instance) + # processor.save() + # processor.suspend() @staticmethod def get_parser() -> MyCustomParser: @@ -1897,9 +1894,7 @@ class ProcessInstanceProcessor: all_tasks = self.bpmn_process_instance.get_tasks(TaskState.ANY_MASK) return [t for t in all_tasks if t.state in [TaskState.WAITING, TaskState.READY]] - def get_task_by_guid( - self, task_guid: str - ) -> Optional[SpiffTask]: + def get_task_by_guid(self, task_guid: str) -> Optional[SpiffTask]: return self.bpmn_process_instance.get_task_from_id(UUID(task_guid)) @classmethod diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 310286e76..babff151f 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -72,7 +72,6 @@ class TaskModelSavingDelegate(EngineStepDelegate): def will_complete_task(self, spiff_task: SpiffTask) -> None: if self._should_update_task_model(): self.current_task_start_in_seconds = time.time() - # import pdb; pdb.set_trace() spiff_task.task_spec._predict(spiff_task, mask=TaskState.NOT_FINISHED_MASK) if self.secondary_engine_step_delegate: self.secondary_engine_step_delegate.will_complete_task(spiff_task) @@ -110,7 +109,6 @@ class TaskModelSavingDelegate(EngineStepDelegate): # ): # self._update_task_model_with_spiff_task(waiting_spiff_task) if self.last_completed_spiff_task is not None: - # import pdb; pdb.set_trace() self.task_service.process_spiff_task_parent_subprocess_tasks(self.last_completed_spiff_task) self.task_service.process_spiff_task_children(self.last_completed_spiff_task) diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py index 84d970bda..c5623f47b 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py @@ -2618,7 +2618,6 @@ class TestProcessApi(BaseTest): ) assert response.status_code == 200 assert response.json is not None - import pdb; pdb.set_trace() assert response.json["status"] == "complete" response = client.get( diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index 34c71e7cb..1caa952d3 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -16,7 +16,6 @@ from spiffworkflow_backend.models.group import GroupModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus from spiffworkflow_backend.models.task import TaskModel # noqa: F401 -from spiffworkflow_backend.models.task_definition import TaskDefinitionModel from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.services.authorization_service import AuthorizationService from spiffworkflow_backend.services.authorization_service import ( @@ -258,125 +257,128 @@ class TestProcessInstanceProcessor(BaseTest): assert spiff_task is not None assert spiff_task.state == TaskState.COMPLETED - def test_properly_resets_process_to_given_task( - self, - app: Flask, - client: FlaskClient, - with_db_and_bpmn_file_cleanup: None, - with_super_admin_user: UserModel, - ) -> None: - self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group") - initiator_user = self.find_or_create_user("initiator_user") - finance_user_three = self.find_or_create_user("testuser3") - assert initiator_user.principal is not None - assert finance_user_three.principal is not None - AuthorizationService.import_permissions_from_yaml_file() - - finance_group = GroupModel.query.filter_by(identifier="Finance Team").first() - assert finance_group is not None - - process_model = load_test_spec( - process_model_id="test_group/manual_task", - process_model_source_directory="manual_task", - ) - process_instance = self.create_process_instance_from_process_model( - process_model=process_model, user=initiator_user - ) - processor = ProcessInstanceProcessor(process_instance) - processor.do_engine_steps(save=True) - assert len(process_instance.active_human_tasks) == 1 - initial_human_task_id = process_instance.active_human_tasks[0].id - - # save again to ensure we go attempt to process the human tasks again - processor.save() - - assert len(process_instance.active_human_tasks) == 1 - assert initial_human_task_id == process_instance.active_human_tasks[0].id - - processor = ProcessInstanceProcessor(process_instance) - human_task_one = process_instance.active_human_tasks[0] - spiff_manual_task = processor.__class__.get_task_by_bpmn_identifier( - human_task_one.task_name, processor.bpmn_process_instance - ) - assert spiff_manual_task is not None - - processor.suspend() - ProcessInstanceProcessor.reset_process(process_instance, str(spiff_manual_task.parent.id), commit=True) - - process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() - processor = ProcessInstanceProcessor(process_instance) - processor.resume() - processor.do_engine_steps(save=True) - human_task_one = process_instance.active_human_tasks[0] - spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) - ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) - assert process_instance.status == "complete" - - def test_properly_resets_process_to_given_task_with_call_activity( - self, - app: Flask, - client: FlaskClient, - with_db_and_bpmn_file_cleanup: None, - with_super_admin_user: UserModel, - ) -> None: - self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group") - initiator_user = self.find_or_create_user("initiator_user") - finance_user_three = self.find_or_create_user("testuser3") - assert initiator_user.principal is not None - assert finance_user_three.principal is not None - AuthorizationService.import_permissions_from_yaml_file() - - finance_group = GroupModel.query.filter_by(identifier="Finance Team").first() - assert finance_group is not None - - process_model = load_test_spec( - process_model_id="test_group/manual_task_with_subprocesses", - process_model_source_directory="manual_task_with_subprocesses", - ) - process_instance = self.create_process_instance_from_process_model( - process_model=process_model, user=initiator_user - ) - processor = ProcessInstanceProcessor(process_instance) - processor.do_engine_steps(save=True) - # with open("before_reset.json", 'w') as f: f.write(json.dumps(processor.serialize(), indent=2)) - assert len(process_instance.active_human_tasks) == 1 - initial_human_task_id = process_instance.active_human_tasks[0].id - assert len(process_instance.active_human_tasks) == 1 - assert initial_human_task_id == process_instance.active_human_tasks[0].id - - human_task_one = process_instance.active_human_tasks[0] - spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) - ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) - human_task_one = process_instance.active_human_tasks[0] - spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) - ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) - - ### NOTES: - # somehow we are hosing the task state so that when completing tasks of a subprocess, the task AFTER the subprocess task - # is not marked READY but instead stays as FUTURE. Running things like: - # self.last_completed_spiff_task.task_spec._update(self.last_completed_spiff_task) - # and - # self.last_completed_spiff_task.task_spec._predict(self.last_completed_spiff_task, mask=TaskState.NOT_FINISHED_MASK) - # did not help. - - processor.suspend() - # import pdb; pdb.set_trace() - task_model_to_reset_to = TaskModel.query.join(TaskDefinitionModel).filter(TaskDefinitionModel.bpmn_identifier == 'top_level_subprocess_script').order_by(TaskModel.id.desc()).first() - assert task_model_to_reset_to is not None - ProcessInstanceProcessor.reset_process(process_instance, task_model_to_reset_to.guid, commit=True) - # import pdb; pdb.set_trace() - - process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() - processor = ProcessInstanceProcessor(process_instance) - processor.resume() - processor.do_engine_steps(save=True) - import pdb; pdb.set_trace() - assert len(process_instance.active_human_tasks) == 1 - human_task_one = process_instance.active_human_tasks[0] - spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) - ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) - - assert process_instance.status == "complete" + # def test_properly_resets_process_to_given_task( + # self, + # app: Flask, + # client: FlaskClient, + # with_db_and_bpmn_file_cleanup: None, + # with_super_admin_user: UserModel, + # ) -> None: + # self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group") + # initiator_user = self.find_or_create_user("initiator_user") + # finance_user_three = self.find_or_create_user("testuser3") + # assert initiator_user.principal is not None + # assert finance_user_three.principal is not None + # AuthorizationService.import_permissions_from_yaml_file() + # + # finance_group = GroupModel.query.filter_by(identifier="Finance Team").first() + # assert finance_group is not None + # + # process_model = load_test_spec( + # process_model_id="test_group/manual_task", + # process_model_source_directory="manual_task", + # ) + # process_instance = self.create_process_instance_from_process_model( + # process_model=process_model, user=initiator_user + # ) + # processor = ProcessInstanceProcessor(process_instance) + # processor.do_engine_steps(save=True) + # assert len(process_instance.active_human_tasks) == 1 + # initial_human_task_id = process_instance.active_human_tasks[0].id + # + # # save again to ensure we go attempt to process the human tasks again + # processor.save() + # + # assert len(process_instance.active_human_tasks) == 1 + # assert initial_human_task_id == process_instance.active_human_tasks[0].id + # + # processor = ProcessInstanceProcessor(process_instance) + # human_task_one = process_instance.active_human_tasks[0] + # spiff_manual_task = processor.__class__.get_task_by_bpmn_identifier( + # human_task_one.task_name, processor.bpmn_process_instance + # ) + # assert spiff_manual_task is not None + # + # processor.suspend() + # ProcessInstanceProcessor.reset_process(process_instance, str(spiff_manual_task.parent.id), commit=True) + # + # process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() + # processor = ProcessInstanceProcessor(process_instance) + # processor.resume() + # processor.do_engine_steps(save=True) + # human_task_one = process_instance.active_human_tasks[0] + # spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) + # ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) + # assert process_instance.status == "complete" + # + # def test_properly_resets_process_to_given_task_with_call_activity( + # self, + # app: Flask, + # client: FlaskClient, + # with_db_and_bpmn_file_cleanup: None, + # with_super_admin_user: UserModel, + # ) -> None: + # self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group") + # initiator_user = self.find_or_create_user("initiator_user") + # finance_user_three = self.find_or_create_user("testuser3") + # assert initiator_user.principal is not None + # assert finance_user_three.principal is not None + # AuthorizationService.import_permissions_from_yaml_file() + # + # finance_group = GroupModel.query.filter_by(identifier="Finance Team").first() + # assert finance_group is not None + # + # process_model = load_test_spec( + # process_model_id="test_group/manual_task_with_subprocesses", + # process_model_source_directory="manual_task_with_subprocesses", + # ) + # process_instance = self.create_process_instance_from_process_model( + # process_model=process_model, user=initiator_user + # ) + # processor = ProcessInstanceProcessor(process_instance) + # processor.do_engine_steps(save=True) + # # with open("before_reset.json", 'w') as f: f.write(json.dumps(processor.serialize(), indent=2)) + # assert len(process_instance.active_human_tasks) == 1 + # initial_human_task_id = process_instance.active_human_tasks[0].id + # assert len(process_instance.active_human_tasks) == 1 + # assert initial_human_task_id == process_instance.active_human_tasks[0].id + # + # human_task_one = process_instance.active_human_tasks[0] + # spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) + # ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) + # human_task_one = process_instance.active_human_tasks[0] + # spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) + # ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) + # + # # NOTES: + # # somehow we are hosing the task state so that when completing tasks of a subprocess, the task AFTER the subprocess task + # # is not marked READY but instead stays as FUTURE. Running things like: + # # self.last_completed_spiff_task.task_spec._update(self.last_completed_spiff_task) + # # and + # # self.last_completed_spiff_task.task_spec._predict(self.last_completed_spiff_task, mask=TaskState.NOT_FINISHED_MASK) + # # did not help. + # + # processor.suspend() + # task_model_to_reset_to = ( + # TaskModel.query.join(TaskDefinitionModel) + # .filter(TaskDefinitionModel.bpmn_identifier == "top_level_subprocess_script") + # .order_by(TaskModel.id.desc()) # type: ignore + # .first() + # ) + # assert task_model_to_reset_to is not None + # ProcessInstanceProcessor.reset_process(process_instance, task_model_to_reset_to.guid, commit=True) + # + # process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() + # processor = ProcessInstanceProcessor(process_instance) + # processor.resume() + # processor.do_engine_steps(save=True) + # + # assert len(process_instance.active_human_tasks) == 1 + # human_task_one = process_instance.active_human_tasks[0] + # spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) + # ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, initiator_user, human_task_one) + # + # assert process_instance.status == "complete" def test_properly_saves_tasks_when_running( self, @@ -511,17 +513,18 @@ class TestProcessInstanceProcessor(BaseTest): f" {expected_task_data_key}." ) - count_failure_message = ( - f"{base_failure_message} There are more than 2 entries of this task in the db." - " There should only ever be max 2." - ) - task_models_with_bpmn_identifier_count = ( - TaskModel.query.join(TaskDefinitionModel) - .filter(TaskModel.process_instance_id == process_instance_relookup.id) - .filter(TaskDefinitionModel.bpmn_identifier == spiff_task.task_spec.name) - .count() - ) - assert task_models_with_bpmn_identifier_count < 3, count_failure_message + # TODO: add back in when removing MAYBE and LIKELY tasks + # count_failure_message = ( + # f"{base_failure_message} There are more than 2 entries of this task in the db." + # " There should only ever be max 2." + # ) + # task_models_with_bpmn_identifier_count = ( + # TaskModel.query.join(TaskDefinitionModel) + # .filter(TaskModel.process_instance_id == process_instance_relookup.id) + # .filter(TaskDefinitionModel.bpmn_identifier == spiff_task.task_spec.name) + # .count() + # ) + # assert task_models_with_bpmn_identifier_count < 3, count_failure_message task_model = TaskModel.query.filter_by(guid=str(spiff_task.id)).first() assert task_model.start_in_seconds is not None @@ -582,12 +585,13 @@ class TestProcessInstanceProcessor(BaseTest): ) assert task_bpmn_identifier in spiff_tasks_checked, message - task_models_that_are_predicted_count = ( - TaskModel.query.filter(TaskModel.process_instance_id == process_instance_relookup.id) - .filter(TaskModel.state.in_(["LIKELY", "MAYBE"])) # type: ignore - .count() - ) - assert task_models_that_are_predicted_count == 0 + # TODO: add back in when removing MAYBE and LIKELY tasks + # task_models_that_are_predicted_count = ( + # TaskModel.query.filter(TaskModel.process_instance_id == process_instance_relookup.id) + # .filter(TaskModel.state.in_(["LIKELY", "MAYBE"])) # type: ignore + # .count() + # ) + # assert task_models_that_are_predicted_count == 0 assert processor.get_data() == data_set_7 @@ -729,7 +733,6 @@ class TestProcessInstanceProcessor(BaseTest): spiff_task = processor.get_task_by_guid(human_task_two.task_id) ProcessInstanceService.complete_form_task(processor, spiff_task, {}, initiator_user, human_task_two) - import pdb; pdb.set_trace() # ensure this does not raise a KeyError processor = ProcessInstanceProcessor(process_instance) assert len(process_instance.active_human_tasks) == 1 diff --git a/spiffworkflow-frontend/src/routes/ProcessInstanceShow.tsx b/spiffworkflow-frontend/src/routes/ProcessInstanceShow.tsx index eaf909556..aab94c111 100644 --- a/spiffworkflow-frontend/src/routes/ProcessInstanceShow.tsx +++ b/spiffworkflow-frontend/src/routes/ProcessInstanceShow.tsx @@ -674,16 +674,16 @@ export default function ProcessInstanceShow({ variant }: OwnProps) { ); }; - const canResetProcess = (task: Task) => { - // // disabling this feature for now - // return false; - return ( - ability.can('POST', targetUris.processInstanceResetPath) && - processInstance && - processInstance.status === 'suspended' && - task.state === 'READY' && - !showingActiveTask() - ); + const canResetProcess = (_task: Task) => { + // disabling this feature for now + return false; + // return ( + // ability.can('POST', targetUris.processInstanceResetPath) && + // processInstance && + // processInstance.status === 'suspended' && + // task.state === 'READY' && + // !showingActiveTask() + // ); }; const getEvents = (task: Task) => {