From 424894b5aea47888cb1590b5fad156469e1fe56e Mon Sep 17 00:00:00 2001 From: Dan Date: Wed, 3 May 2023 17:08:22 -0400 Subject: [PATCH 1/2] Test and updates to assure that when a task has a boundary event, and you return to that event, and then progress one step, you don't get stuck with a task that can't ever be completed. Let SpiffWorkflow determine what tasks we need to update in the DB using the task_state_change date on the tasks. --- .../services/process_instance_processor.py | 43 +++++---- .../boundary_event_reset.bpmn | 51 +++++++++++ .../boundary_event_reset/sub_with_timer.bpmn | 89 +++++++++++++++++++ .../unit/test_process_instance_processor.py | 47 ++++++++++ 4 files changed, 212 insertions(+), 18 deletions(-) create mode 100644 spiffworkflow-backend/tests/data/boundary_event_reset/boundary_event_reset.bpmn create mode 100644 spiffworkflow-backend/tests/data/boundary_event_reset/sub_with_timer.bpmn diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 2ff469b3f..4da0724f6 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1115,59 +1115,54 @@ class ProcessInstanceProcessor: def manual_complete_task(self, task_id: str, execute: bool) -> None: """Mark the task complete optionally executing it.""" - spiff_tasks_updated = {} start_in_seconds = time.time() spiff_task = self.bpmn_process_instance.get_task_from_id(UUID(task_id)) event_type = ProcessInstanceEventType.task_skipped.value + start_time = time.time() + if execute: current_app.logger.info( f"Manually executing Task {spiff_task.task_spec.name} of process" f" instance {self.process_instance_model.id}" ) - # Executing a subworkflow manually will restart its subprocess and allow stepping through it + # Executing a sub-workflow manually will restart its subprocess and allow stepping through it if isinstance(spiff_task.task_spec, SubWorkflowTask): subprocess = self.bpmn_process_instance.get_subprocess(spiff_task) # We have to get to the actual start event - for task in self.bpmn_process_instance.get_tasks(workflow=subprocess): - task.complete() - spiff_tasks_updated[task.id] = task - if isinstance(task.task_spec, StartEvent): + for spiff_task in self.bpmn_process_instance.get_tasks(workflow=subprocess): + spiff_task.run() + if isinstance(spiff_task.task_spec, StartEvent): break else: - spiff_task.complete() - spiff_tasks_updated[spiff_task.id] = spiff_task - for child in spiff_task.children: - spiff_tasks_updated[child.id] = child + spiff_task.run() event_type = ProcessInstanceEventType.task_executed_manually.value else: spiff_logger = logging.getLogger("spiff") spiff_logger.info(f"Skipped task {spiff_task.task_spec.name}", extra=spiff_task.log_info()) - spiff_task._set_state(TaskState.COMPLETED) - for child in spiff_task.children: - child.task_spec._update(child) - spiff_tasks_updated[child.id] = child + spiff_task.complete() spiff_task.workflow.last_task = spiff_task - spiff_tasks_updated[spiff_task.id] = spiff_task - end_in_seconds = time.time() if isinstance(spiff_task.task_spec, EndEvent): for task in self.bpmn_process_instance.get_tasks(TaskState.DEFINITE_MASK, workflow=spiff_task.workflow): task.complete() - spiff_tasks_updated[task.id] = task # A subworkflow task will become ready when its workflow is complete. Engine steps would normally # then complete it, but we have to do it ourselves here. for task in self.bpmn_process_instance.get_tasks(TaskState.READY): if isinstance(task.task_spec, SubWorkflowTask): task.complete() - spiff_tasks_updated[task.id] = task task_service = TaskService( process_instance=self.process_instance_model, serializer=self._serializer, bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, ) + + spiff_tasks_updated = {} + for task in self.bpmn_process_instance.get_tasks(): + if task.last_state_change > start_time: + spiff_tasks_updated[task.id] = task for updated_spiff_task in spiff_tasks_updated.values(): ( bpmn_process, @@ -1216,6 +1211,13 @@ class ProcessInstanceProcessor: raise TaskNotFoundError( f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'" ) + # If this task model has a parent boundary event, reset to that point instead, so we can reset all the boundary timers, etc... + parent_id = to_task_model.properties_json.get('parent','') + parent = TaskModel.query.filter_by(guid=parent_id).first() + is_boundary_parent = False + if parent and parent.task_definition.typename == '_BoundaryEventParent': + to_task_model = parent + is_boundary_parent = True # Will need to complete this task at the end so we are on the correct process. # NOTE: run ALL queries before making changes to ensure we get everything before anything changes parent_bpmn_processes, task_models_of_parent_bpmn_processes = TaskService.task_models_of_parent_bpmn_processes( @@ -1320,6 +1322,11 @@ class ProcessInstanceProcessor: db.session.commit() processor = ProcessInstanceProcessor(process_instance) + + # If this as a boundary event parent, run it, so we get back to an active task. + if is_boundary_parent: + processor.do_engine_steps(execution_strategy_name='one_at_a_time') + processor.save() processor.suspend() diff --git a/spiffworkflow-backend/tests/data/boundary_event_reset/boundary_event_reset.bpmn b/spiffworkflow-backend/tests/data/boundary_event_reset/boundary_event_reset.bpmn new file mode 100644 index 000000000..01ce0c5a9 --- /dev/null +++ b/spiffworkflow-backend/tests/data/boundary_event_reset/boundary_event_reset.bpmn @@ -0,0 +1,51 @@ + + + + + Flow_1ist4rn + + + + Flow_1xbry1g + + + + Flow_1ist4rn + Flow_0vzi07z + + + + Flow_0vzi07z + Flow_1xbry1g + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spiffworkflow-backend/tests/data/boundary_event_reset/sub_with_timer.bpmn b/spiffworkflow-backend/tests/data/boundary_event_reset/sub_with_timer.bpmn new file mode 100644 index 000000000..adffceda2 --- /dev/null +++ b/spiffworkflow-backend/tests/data/boundary_event_reset/sub_with_timer.bpmn @@ -0,0 +1,89 @@ + + + + + Flow_1e5apvr + + + + + Flow_110vf76 + + + + Flow_1hy0t7d + + 'P14D' + + + + + Flow_1xbdri7 + + + + Flow_1e5apvr + Flow_0vtgres + + + Flow_1hy0t7d + Flow_1xbdri7 + + + Flow_0vtgres + Flow_110vf76 + y='1000' + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index bfda1eb34..2495a6b51 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -434,6 +434,53 @@ class TestProcessInstanceProcessor(BaseTest): assert process_instance.status == "complete" + def test_properly_resets_process_on_tasks_with_boundary_events( + self, + app: Flask, + client: FlaskClient, + with_db_and_bpmn_file_cleanup: None, + with_super_admin_user: UserModel, + ) -> None: + self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group") + process_model = load_test_spec( + process_model_id="test_group/boundary_event_reset", + process_model_source_directory="boundary_event_reset", + ) + process_instance = self.create_process_instance_from_process_model( + process_model=process_model, user=with_super_admin_user + ) + processor = ProcessInstanceProcessor(process_instance) + processor.do_engine_steps(save=True) + assert len(process_instance.active_human_tasks) == 1 + human_task_one = process_instance.active_human_tasks[0] + spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) + ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, with_super_admin_user, human_task_one) + assert ( + len(process_instance.active_human_tasks) == 1 + ), "expected 1 active human tasks after 2nd one is completed" + assert process_instance.active_human_tasks[0].task_title == 'Final' + + # Reset the process back to the task within the call activity that contains a timer_boundary event. + reset_to_spiff_task = processor.__class__.get_task_by_bpmn_identifier( + 'manual_task_1', processor.bpmn_process_instance + ) + processor.suspend() + processor = ProcessInstanceProcessor(process_instance) + ProcessInstanceProcessor.reset_process(process_instance, str(reset_to_spiff_task.id)) + human_task_one = process_instance.active_human_tasks[0] + assert human_task_one.task_title == 'Manual Task #1' + processor = ProcessInstanceProcessor(process_instance) + processor.manual_complete_task(str(spiff_manual_task.id), execute=True) + processor = ProcessInstanceProcessor(process_instance) + processor.resume() + processor.do_engine_steps(save=True) + process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() + + assert (len(process_instance.active_human_tasks) == 1) + assert process_instance.active_human_tasks[0].task_title == 'Final', \ + "once we reset, resume, and complete the task, we should be back to the Final step again, and not" \ + "stuck waiting for the call activity to complete (which was happening in a bug I'm fixing right now)" + def test_properly_saves_tasks_when_running( self, app: Flask, From fc7d3c39077cc8e5e40a808f14193d1926b29b2b Mon Sep 17 00:00:00 2001 From: Dan Date: Wed, 3 May 2023 17:29:33 -0400 Subject: [PATCH 2/2] run_pyl --- .../spiffworkflow_backend/config/__init__.py | 12 +++---- .../models/process_instance.py | 6 ++-- .../services/process_instance_processor.py | 21 +++++++------ .../unit/test_process_instance_processor.py | 31 ++++++++++--------- 4 files changed, 37 insertions(+), 33 deletions(-) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/config/__init__.py b/spiffworkflow-backend/src/spiffworkflow_backend/config/__init__.py index 7711c36f9..eaf67f6c9 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/config/__init__.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/config/__init__.py @@ -18,13 +18,13 @@ def setup_database_uri(app: Flask) -> None: if app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_URI") is None: database_name = f"spiffworkflow_backend_{app.config['ENV_IDENTIFIER']}" if app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_TYPE") == "sqlite": - app.config["SQLALCHEMY_DATABASE_URI"] = ( - f"sqlite:///{app.instance_path}/db_{app.config['ENV_IDENTIFIER']}.sqlite3" - ) + app.config[ + "SQLALCHEMY_DATABASE_URI" + ] = f"sqlite:///{app.instance_path}/db_{app.config['ENV_IDENTIFIER']}.sqlite3" elif app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_TYPE") == "postgres": - app.config["SQLALCHEMY_DATABASE_URI"] = ( - f"postgresql://spiffworkflow_backend:spiffworkflow_backend@localhost:5432/{database_name}" - ) + app.config[ + "SQLALCHEMY_DATABASE_URI" + ] = f"postgresql://spiffworkflow_backend:spiffworkflow_backend@localhost:5432/{database_name}" else: # use pswd to trick flake8 with hardcoded passwords db_pswd = app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_PASSWORD") diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py index b3ab709df..303532af5 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py @@ -129,9 +129,9 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel): def serialized_with_metadata(self) -> dict[str, Any]: process_instance_attributes = self.serialized process_instance_attributes["process_metadata"] = self.process_metadata - process_instance_attributes["process_model_with_diagram_identifier"] = ( - self.process_model_with_diagram_identifier - ) + process_instance_attributes[ + "process_model_with_diagram_identifier" + ] = self.process_model_with_diagram_identifier return process_instance_attributes @property diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index a42abf4f2..d2f532327 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -423,9 +423,9 @@ class ProcessInstanceProcessor: tld.process_instance_id = process_instance_model.id # we want this to be the fully qualified path to the process model including all group subcomponents - current_app.config["THREAD_LOCAL_DATA"].process_model_identifier = ( - f"{process_instance_model.process_model_identifier}" - ) + current_app.config[ + "THREAD_LOCAL_DATA" + ].process_model_identifier = f"{process_instance_model.process_model_identifier}" self.process_instance_model = process_instance_model self.process_model_service = ProcessModelService() @@ -585,9 +585,9 @@ class ProcessInstanceProcessor: bpmn_subprocess_definition.bpmn_identifier ] = bpmn_process_definition_dict spiff_bpmn_process_dict["subprocess_specs"][bpmn_subprocess_definition.bpmn_identifier]["task_specs"] = {} - bpmn_subprocess_definition_bpmn_identifiers[bpmn_subprocess_definition.id] = ( - bpmn_subprocess_definition.bpmn_identifier - ) + bpmn_subprocess_definition_bpmn_identifiers[ + bpmn_subprocess_definition.id + ] = bpmn_subprocess_definition.bpmn_identifier task_definitions = TaskDefinitionModel.query.filter( TaskDefinitionModel.bpmn_process_definition_id.in_( # type: ignore @@ -1211,11 +1211,12 @@ class ProcessInstanceProcessor: raise TaskNotFoundError( f"Cannot find a task with guid '{to_task_guid}' for process instance '{process_instance.id}'" ) - # If this task model has a parent boundary event, reset to that point instead, so we can reset all the boundary timers, etc... - parent_id = to_task_model.properties_json.get('parent','') + # If this task model has a parent boundary event, reset to that point instead, + # so we can reset all the boundary timers, etc... + parent_id = to_task_model.properties_json.get("parent", "") parent = TaskModel.query.filter_by(guid=parent_id).first() is_boundary_parent = False - if parent and parent.task_definition.typename == '_BoundaryEventParent': + if parent and parent.task_definition.typename == "_BoundaryEventParent": to_task_model = parent is_boundary_parent = True # Will need to complete this task at the end so we are on the correct process. @@ -1325,7 +1326,7 @@ class ProcessInstanceProcessor: # If this as a boundary event parent, run it, so we get back to an active task. if is_boundary_parent: - processor.do_engine_steps(execution_strategy_name='one_at_a_time') + processor.do_engine_steps(execution_strategy_name="one_at_a_time") processor.save() processor.suspend() diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index 2495a6b51..92e19919a 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -435,11 +435,11 @@ class TestProcessInstanceProcessor(BaseTest): assert process_instance.status == "complete" def test_properly_resets_process_on_tasks_with_boundary_events( - self, - app: Flask, - client: FlaskClient, - with_db_and_bpmn_file_cleanup: None, - with_super_admin_user: UserModel, + self, + app: Flask, + client: FlaskClient, + with_db_and_bpmn_file_cleanup: None, + with_super_admin_user: UserModel, ) -> None: self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group") process_model = load_test_spec( @@ -454,21 +454,23 @@ class TestProcessInstanceProcessor(BaseTest): assert len(process_instance.active_human_tasks) == 1 human_task_one = process_instance.active_human_tasks[0] spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) - ProcessInstanceService.complete_form_task(processor, spiff_manual_task, {}, with_super_admin_user, human_task_one) + ProcessInstanceService.complete_form_task( + processor, spiff_manual_task, {}, with_super_admin_user, human_task_one + ) assert ( - len(process_instance.active_human_tasks) == 1 + len(process_instance.active_human_tasks) == 1 ), "expected 1 active human tasks after 2nd one is completed" - assert process_instance.active_human_tasks[0].task_title == 'Final' + assert process_instance.active_human_tasks[0].task_title == "Final" # Reset the process back to the task within the call activity that contains a timer_boundary event. - reset_to_spiff_task = processor.__class__.get_task_by_bpmn_identifier( - 'manual_task_1', processor.bpmn_process_instance + reset_to_spiff_task: SpiffTask = processor.__class__.get_task_by_bpmn_identifier( + "manual_task_1", processor.bpmn_process_instance ) processor.suspend() processor = ProcessInstanceProcessor(process_instance) ProcessInstanceProcessor.reset_process(process_instance, str(reset_to_spiff_task.id)) human_task_one = process_instance.active_human_tasks[0] - assert human_task_one.task_title == 'Manual Task #1' + assert human_task_one.task_title == "Manual Task #1" processor = ProcessInstanceProcessor(process_instance) processor.manual_complete_task(str(spiff_manual_task.id), execute=True) processor = ProcessInstanceProcessor(process_instance) @@ -476,10 +478,11 @@ class TestProcessInstanceProcessor(BaseTest): processor.do_engine_steps(save=True) process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() - assert (len(process_instance.active_human_tasks) == 1) - assert process_instance.active_human_tasks[0].task_title == 'Final', \ - "once we reset, resume, and complete the task, we should be back to the Final step again, and not" \ + assert len(process_instance.active_human_tasks) == 1 + assert process_instance.active_human_tasks[0].task_title == "Final", ( + "once we reset, resume, and complete the task, we should be back to the Final step again, and not" "stuck waiting for the call activity to complete (which was happening in a bug I'm fixing right now)" + ) def test_properly_saves_tasks_when_running( self,