From 2128fad5bc8436f6a1f0279dc307286848b73f66 Mon Sep 17 00:00:00 2001 From: Dan Date: Wed, 3 May 2023 17:08:22 -0400 Subject: [PATCH 1/2] Test and updates to assure that when a task has a boundary event, and you return to that event, and then progress one step, you don't get stuck with a task that can't ever be completed. Let SpiffWorkflow determine what tasks we need to update in the DB using the task_state_change date on the tasks. --- .../services/process_instance_processor.py | 43 +++++---- .../boundary_event_reset.bpmn | 51 +++++++++++ .../boundary_event_reset/sub_with_timer.bpmn | 89 +++++++++++++++++++ .../unit/test_process_instance_processor.py | 47 ++++++++++ 4 files changed, 212 insertions(+), 18 deletions(-) create mode 100644 spiffworkflow-backend/tests/data/boundary_event_reset/boundary_event_reset.bpmn create mode 100644 spiffworkflow-backend/tests/data/boundary_event_reset/sub_with_timer.bpmn diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 2ff469b3..4da0724f 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1115,59 +1115,54 @@ class ProcessInstanceProcessor: def manual_complete_task(self, task_id: str, execute: bool) -> None: """Mark the task complete optionally executing it.""" - spiff_tasks_updated = {} start_in_seconds = time.time() spiff_task = self.bpmn_process_instance.get_task_from_id(UUID(task_id)) event_type = ProcessInstanceEventType.task_skipped.value + start_time = time.time() + if execute: current_app.logger.info( f"Manually executing Task {spiff_task.task_spec.name} of process" f" instance {self.process_instance_model.id}" ) - # Executing a subworkflow manually will restart its subprocess and allow stepping through it + # Executing a sub-workflow manually will restart its subprocess and allow stepping through it if isinstance(spiff_task.task_spec, SubWorkflowTask): subprocess = self.bpmn_process_instance.get_subprocess(spiff_task) # We have to get to the actual start event - for task in self.bpmn_process_instance.get_tasks(workflow=subprocess): - task.complete() - spiff_tasks_updated[task.id] = task - if isinstance(task.task_spec, StartEvent): + for spiff_task in self.bpmn_process_instance.get_tasks(workflow=subprocess): + spiff_task.run() + if isinstance(spiff_task.task_spec, StartEvent): break else: - spiff_task.complete() - spiff_tasks_updated[spiff_task.id] = spiff_task - for child in spiff_task.children: - spiff_tasks_updated[child.id] = child + spiff_task.run() event_type = ProcessInstanceEventType.task_executed_manually.value else: spiff_logger = logging.getLogger("spiff") spiff_logger.info(f"Skipped task {spiff_task.task_spec.name}", extra=spiff_task.log_info()) - spiff_task._set_state(TaskState.COMPLETED) - for child in spiff_task.children: - child.task_spec._update(child) - spiff_tasks_updated[child.id] = child + spiff_task.complete() spiff_task.workflow.last_task = spiff_task - spiff_tasks_updated[spiff_task.id] = spiff_task - end_in_seconds = time.time() if isinstance(spiff_task.task_spec, EndEvent): for task in self.bpmn_process_instance.get_tasks(TaskState.DEFINITE_MASK, workflow=spiff_task.workflow): task.complete() - spiff_tasks_updated[task.id] = task # A subworkflow task will become ready when its workflow is complete. Engine steps would normally # then complete it, but we have to do it ourselves here. for task in self.bpmn_process_instance.get_tasks(TaskState.READY): if isinstance(task.task_spec, SubWorkflowTask): task.complete() - spiff_tasks_updated[task.id] = task task_service = TaskService( process_instance=self.process_instance_model, serializer=self._serializer, bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, ) + + spiff_tasks_updated = {} + for task in self.bpmn_process_instance.get_tasks(): + if task.last_state_change > start_time: + spiff_tasks_updated[task.id] = task for updated_spiff_task in spiff_tasks_updated.values(): ( bpmn_process, @@ -1216,6 +1211,13 @@ class ProcessInstanceProcessor: raise TaskNotFoundError( f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'" ) + # If this task model has a parent boundary event, reset to that point instead, so we can reset all the boundary timers, etc... + parent_id = to_task_model.properties_json.get('parent','') + parent = TaskModel.query.filter_by(guid=parent_id).first() + is_boundary_parent = False + if parent and parent.task_definition.typename == '_BoundaryEventParent': + to_task_model = parent + is_boundary_parent = True # Will need to complete this task at the end so we are on the correct process. # NOTE: run ALL queries before making changes to ensure we get everything before anything changes parent_bpmn_processes, task_models_of_parent_bpmn_processes = TaskService.task_models_of_parent_bpmn_processes( @@ -1320,6 +1322,11 @@ class ProcessInstanceProcessor: db.session.commit() processor = ProcessInstanceProcessor(process_instance) + + # If this as a boundary event parent, run it, so we get back to an active task. + if is_boundary_parent: + processor.do_engine_steps(execution_strategy_name='one_at_a_time') + processor.save() processor.suspend() diff --git a/spiffworkflow-backend/tests/data/boundary_event_reset/boundary_event_reset.bpmn b/spiffworkflow-backend/tests/data/boundary_event_reset/boundary_event_reset.bpmn new file mode 100644 index 00000000..01ce0c5a --- /dev/null +++ b/spiffworkflow-backend/tests/data/boundary_event_reset/boundary_event_reset.bpmn @@ -0,0 +1,51 @@ + + + + + Flow_1ist4rn + + + + Flow_1xbry1g + + + + Flow_1ist4rn + Flow_0vzi07z + + + + Flow_0vzi07z + Flow_1xbry1g + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spiffworkflow-backend/tests/data/boundary_event_reset/sub_with_timer.bpmn b/spiffworkflow-backend/tests/data/boundary_event_reset/sub_with_timer.bpmn new file mode 100644 index 00000000..adffceda --- /dev/null +++ b/spiffworkflow-backend/tests/data/boundary_event_reset/sub_with_timer.bpmn @@ -0,0 +1,89 @@ + + + + + Flow_1e5apvr + + + + + Flow_110vf76 + + + + Flow_1hy0t7d + + 'P14D' + + + + + Flow_1xbdri7 + + + + Flow_1e5apvr + Flow_0vtgres + + + Flow_1hy0t7d + Flow_1xbdri7 + + + Flow_0vtgres + Flow_110vf76 + y='1000' + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index bfda1eb3..2495a6b5 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -434,6 +434,53 @@ class TestProcessInstanceProcessor(BaseTest): assert process_instance.status == "complete" + def test_properly_resets_process_on_tasks_with_boundary_events( + self, + app: Flask, + client: FlaskClient, + with_db_and_bpmn_file_cleanup: None, + with_super_admin_user: UserModel, + ) -> None: + self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group") + process_model = load_test_spec( + process_model_id="test_group/boundary_event_reset", + process_model_source_directory="boundary_event_reset", + ) + process_instance = self.create_process_instance_from_process_model( + process_model=process_model, user=with_super_admin_user + ) + processor = ProcessInstanceProcessor(process_instance) + processor.do_engine_steps(save=True) + assert len(process_instance.active_human_tasks) == 1 + human_task_one = process_instance.active_human_tasks[0] + spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) + ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, with_super_admin_user, human_task_one) + assert ( + len(process_instance.active_human_tasks) == 1 + ), "expected 1 active human tasks after 2nd one is completed" + assert process_instance.active_human_tasks[0].task_title == 'Final' + + # Reset the process back to the task within the call activity that contains a timer_boundary event. + reset_to_spiff_task = processor.__class__.get_task_by_bpmn_identifier( + 'manual_task_1', processor.bpmn_process_instance + ) + processor.suspend() + processor = ProcessInstanceProcessor(process_instance) + ProcessInstanceProcessor.reset_process(process_instance, str(reset_to_spiff_task.id)) + human_task_one = process_instance.active_human_tasks[0] + assert human_task_one.task_title == 'Manual Task #1' + processor = ProcessInstanceProcessor(process_instance) + processor.manual_complete_task(str(spiff_manual_task.id), execute=True) + processor = ProcessInstanceProcessor(process_instance) + processor.resume() + processor.do_engine_steps(save=True) + process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() + + assert (len(process_instance.active_human_tasks) == 1) + assert process_instance.active_human_tasks[0].task_title == 'Final', \ + "once we reset, resume, and complete the task, we should be back to the Final step again, and not" \ + "stuck waiting for the call activity to complete (which was happening in a bug I'm fixing right now)" + def test_properly_saves_tasks_when_running( self, app: Flask, From 3894ec264f4b23b42d0da2d0e2773ecf94e1dc46 Mon Sep 17 00:00:00 2001 From: Dan Date: Wed, 3 May 2023 17:29:33 -0400 Subject: [PATCH 2/2] run_pyl --- .../spiffworkflow_backend/config/__init__.py | 12 +++---- .../models/process_instance.py | 6 ++-- .../services/process_instance_processor.py | 21 +++++++------ .../unit/test_process_instance_processor.py | 31 ++++++++++--------- 4 files changed, 37 insertions(+), 33 deletions(-) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/config/__init__.py b/spiffworkflow-backend/src/spiffworkflow_backend/config/__init__.py index 7711c36f..eaf67f6c 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/config/__init__.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/config/__init__.py @@ -18,13 +18,13 @@ def setup_database_uri(app: Flask) -> None: if app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_URI") is None: database_name = f"spiffworkflow_backend_{app.config['ENV_IDENTIFIER']}" if app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_TYPE") == "sqlite": - app.config["SQLALCHEMY_DATABASE_URI"] = ( - f"sqlite:///{app.instance_path}/db_{app.config['ENV_IDENTIFIER']}.sqlite3" - ) + app.config[ + "SQLALCHEMY_DATABASE_URI" + ] = f"sqlite:///{app.instance_path}/db_{app.config['ENV_IDENTIFIER']}.sqlite3" elif app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_TYPE") == "postgres": - app.config["SQLALCHEMY_DATABASE_URI"] = ( - f"postgresql://spiffworkflow_backend:spiffworkflow_backend@localhost:5432/{database_name}" - ) + app.config[ + "SQLALCHEMY_DATABASE_URI" + ] = f"postgresql://spiffworkflow_backend:spiffworkflow_backend@localhost:5432/{database_name}" else: # use pswd to trick flake8 with hardcoded passwords db_pswd = app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_PASSWORD") diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py index b3ab709d..303532af 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py @@ -129,9 +129,9 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel): def serialized_with_metadata(self) -> dict[str, Any]: process_instance_attributes = self.serialized process_instance_attributes["process_metadata"] = self.process_metadata - process_instance_attributes["process_model_with_diagram_identifier"] = ( - self.process_model_with_diagram_identifier - ) + process_instance_attributes[ + "process_model_with_diagram_identifier" + ] = self.process_model_with_diagram_identifier return process_instance_attributes @property diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index a42abf4f..d2f53232 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -423,9 +423,9 @@ class ProcessInstanceProcessor: tld.process_instance_id = process_instance_model.id # we want this to be the fully qualified path to the process model including all group subcomponents - current_app.config["THREAD_LOCAL_DATA"].process_model_identifier = ( - f"{process_instance_model.process_model_identifier}" - ) + current_app.config[ + "THREAD_LOCAL_DATA" + ].process_model_identifier = f"{process_instance_model.process_model_identifier}" self.process_instance_model = process_instance_model self.process_model_service = ProcessModelService() @@ -585,9 +585,9 @@ class ProcessInstanceProcessor: bpmn_subprocess_definition.bpmn_identifier ] = bpmn_process_definition_dict spiff_bpmn_process_dict["subprocess_specs"][bpmn_subprocess_definition.bpmn_identifier]["task_specs"] = {} - bpmn_subprocess_definition_bpmn_identifiers[bpmn_subprocess_definition.id] = ( - bpmn_subprocess_definition.bpmn_identifier - ) + bpmn_subprocess_definition_bpmn_identifiers[ + bpmn_subprocess_definition.id + ] = bpmn_subprocess_definition.bpmn_identifier task_definitions = TaskDefinitionModel.query.filter( TaskDefinitionModel.bpmn_process_definition_id.in_( # type: ignore @@ -1211,11 +1211,12 @@ class ProcessInstanceProcessor: raise TaskNotFoundError( f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'" ) - # If this task model has a parent boundary event, reset to that point instead, so we can reset all the boundary timers, etc... - parent_id = to_task_model.properties_json.get('parent','') + # If this task model has a parent boundary event, reset to that point instead, + # so we can reset all the boundary timers, etc... + parent_id = to_task_model.properties_json.get("parent", "") parent = TaskModel.query.filter_by(guid=parent_id).first() is_boundary_parent = False - if parent and parent.task_definition.typename == '_BoundaryEventParent': + if parent and parent.task_definition.typename == "_BoundaryEventParent": to_task_model = parent is_boundary_parent = True # Will need to complete this task at the end so we are on the correct process. @@ -1325,7 +1326,7 @@ class ProcessInstanceProcessor: # If this as a boundary event parent, run it, so we get back to an active task. if is_boundary_parent: - processor.do_engine_steps(execution_strategy_name='one_at_a_time') + processor.do_engine_steps(execution_strategy_name="one_at_a_time") processor.save() processor.suspend() diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index 2495a6b5..92e19919 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -435,11 +435,11 @@ class TestProcessInstanceProcessor(BaseTest): assert process_instance.status == "complete" def test_properly_resets_process_on_tasks_with_boundary_events( - self, - app: Flask, - client: FlaskClient, - with_db_and_bpmn_file_cleanup: None, - with_super_admin_user: UserModel, + self, + app: Flask, + client: FlaskClient, + with_db_and_bpmn_file_cleanup: None, + with_super_admin_user: UserModel, ) -> None: self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group") process_model = load_test_spec( @@ -454,21 +454,23 @@ class TestProcessInstanceProcessor(BaseTest): assert len(process_instance.active_human_tasks) == 1 human_task_one = process_instance.active_human_tasks[0] spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) - ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, with_super_admin_user, human_task_one) + ProcessInstanceService.complete_form_task( + processor, spiff_manual_task, {}, with_super_admin_user, human_task_one + ) assert ( - len(process_instance.active_human_tasks) == 1 + len(process_instance.active_human_tasks) == 1 ), "expected 1 active human tasks after 2nd one is completed" - assert process_instance.active_human_tasks[0].task_title == 'Final' + assert process_instance.active_human_tasks[0].task_title == "Final" # Reset the process back to the task within the call activity that contains a timer_boundary event. - reset_to_spiff_task = processor.__class__.get_task_by_bpmn_identifier( - 'manual_task_1', processor.bpmn_process_instance + reset_to_spiff_task: SpiffTask = processor.__class__.get_task_by_bpmn_identifier( + "manual_task_1", processor.bpmn_process_instance ) processor.suspend() processor = ProcessInstanceProcessor(process_instance) ProcessInstanceProcessor.reset_process(process_instance, str(reset_to_spiff_task.id)) human_task_one = process_instance.active_human_tasks[0] - assert human_task_one.task_title == 'Manual Task #1' + assert human_task_one.task_title == "Manual Task #1" processor = ProcessInstanceProcessor(process_instance) processor.manual_complete_task(str(spiff_manual_task.id), execute=True) processor = ProcessInstanceProcessor(process_instance) @@ -476,10 +478,11 @@ class TestProcessInstanceProcessor(BaseTest): processor.do_engine_steps(save=True) process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() - assert (len(process_instance.active_human_tasks) == 1) - assert process_instance.active_human_tasks[0].task_title == 'Final', \ - "once we reset, resume, and complete the task, we should be back to the Final step again, and not" \ + assert len(process_instance.active_human_tasks) == 1 + assert process_instance.active_human_tasks[0].task_title == "Final", ( + "once we reset, resume, and complete the task, we should be back to the Final step again, and not" "stuck waiting for the call activity to complete (which was happening in a bug I'm fixing right now)" + ) def test_properly_saves_tasks_when_running( self,