diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/bpmn_process.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/bpmn_process.py index d5ba53dfc..910e2e27a 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/bpmn_process.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/bpmn_process.py @@ -3,7 +3,7 @@ from __future__ import annotations from dataclasses import dataclass from sqlalchemy import ForeignKey -from sqlalchemy.orm import relationship +from sqlalchemy.orm import relationship, backref from spiffworkflow_backend.models.bpmn_process_definition import BpmnProcessDefinitionModel from spiffworkflow_backend.models.db import db @@ -38,6 +38,8 @@ class BpmnProcessModel(SpiffworkflowBaseDBModel): json_data_hash: str = db.Column(db.String(255), nullable=False, index=True) tasks = relationship("TaskModel", back_populates="bpmn_process", cascade="delete") # type: ignore + child_processes = relationship("BpmnProcessModel", foreign_keys=[direct_parent_process_id], + cascade="all") # type: ignore # FIXME: find out how to set this but it'd be cool start_in_seconds: float = db.Column(db.DECIMAL(17, 6)) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index c044228f7..19c296dd6 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -24,7 +24,7 @@ from uuid import UUID import dateparser import pytz -from flask import current_app +from flask import current_app, g from lxml import etree # type: ignore from lxml.etree import XMLSyntaxError # type: ignore from RestrictedPython import safe_globals # type: ignore @@ -415,9 +415,9 @@ class ProcessInstanceProcessor: tld.process_instance_id = process_instance_model.id # we want this to be the fully qualified path to the process model including all group subcomponents - current_app.config["THREAD_LOCAL_DATA"].process_model_identifier = ( - f"{process_instance_model.process_model_identifier}" - ) + current_app.config[ + "THREAD_LOCAL_DATA" + ].process_model_identifier = f"{process_instance_model.process_model_identifier}" self.process_instance_model = process_instance_model self.process_model_service = ProcessModelService() @@ -577,9 +577,9 @@ class ProcessInstanceProcessor: bpmn_subprocess_definition.bpmn_identifier ] = bpmn_process_definition_dict spiff_bpmn_process_dict["subprocess_specs"][bpmn_subprocess_definition.bpmn_identifier]["task_specs"] = {} - bpmn_subprocess_definition_bpmn_identifiers[bpmn_subprocess_definition.id] = ( - bpmn_subprocess_definition.bpmn_identifier - ) + bpmn_subprocess_definition_bpmn_identifiers[ + bpmn_subprocess_definition.id + ] = bpmn_subprocess_definition.bpmn_identifier task_definitions = TaskDefinitionModel.query.filter( TaskDefinitionModel.bpmn_process_definition_id.in_( # type: ignore @@ -1116,76 +1116,35 @@ class ProcessInstanceProcessor: event_type = ProcessInstanceEventType.task_skipped.value start_time = time.time() - if execute: + if spiff_task.task_spec.manual: + # Executing or not executing a human task results in the same state. + current_app.logger.info( + f"Manually skipping Human Task {spiff_task.task_spec.name} of process" + f" instance {self.process_instance_model.id}" + ) + human_task = HumanTaskModel.query.filter_by(task_id=task_id).first() + self.complete_task(spiff_task, human_task=human_task, user=g.user) + elif execute: current_app.logger.info( f"Manually executing Task {spiff_task.task_spec.name} of process" f" instance {self.process_instance_model.id}" ) - # Executing a sub-workflow manually will restart its subprocess and allow stepping through it - if isinstance(spiff_task.task_spec, SubWorkflowTask): - subprocess = self.bpmn_process_instance.get_subprocess(spiff_task) - # We have to get to the actual start event - for spiff_task in self.bpmn_process_instance.get_tasks(workflow=subprocess): - spiff_task.run() - if spiff_task.task_spec.__class__.__name__ == "StartEvent": - break - else: - spiff_task.run() - event_type = ProcessInstanceEventType.task_executed_manually.value + self.do_engine_steps(save=True, execution_strategy_name="one_at_a_time") else: - spiff_logger = logging.getLogger("spiff") - spiff_logger.info(f"Skipped task {spiff_task.task_spec.name}", extra=spiff_task.log_info()) - spiff_task.complete() - spiff_task.workflow.last_task = spiff_task - end_in_seconds = time.time() - - if spiff_task.task_spec.__class__.__name__ == "EndEvent": - for task in self.bpmn_process_instance.get_tasks(TaskState.DEFINITE_MASK, workflow=spiff_task.workflow): - task.complete() - - # A subworkflow task will become ready when its workflow is complete. Engine steps would normally - # then complete it, but we have to do it ourselves here. - for task in self.bpmn_process_instance.get_tasks(TaskState.READY): - if isinstance(task.task_spec, SubWorkflowTask): - task.complete() + current_app.logger.info(f"Skipped task {spiff_task.task_spec.name}", extra=spiff_task.log_info()) + self.do_engine_steps(save=True, execution_strategy_name="skip_one") + spiff_tasks = self.bpmn_process_instance.get_tasks() task_service = TaskService( process_instance=self.process_instance_model, serializer=self._serializer, bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, ) - - spiff_tasks_updated = {} - for task in self.bpmn_process_instance.get_tasks(): - if task.last_state_change > start_time: - spiff_tasks_updated[task.id] = task - for updated_spiff_task in spiff_tasks_updated.values(): - ( - bpmn_process, - task_model, - ) = task_service.find_or_create_task_model_from_spiff_task( - updated_spiff_task, - ) - bpmn_process_to_use = bpmn_process or task_model.bpmn_process - bpmn_process_json_data = TaskService.update_task_data_on_bpmn_process( - bpmn_process_to_use, updated_spiff_task.workflow.data - ) - db.session.add(bpmn_process_to_use) - task_service.update_task_model(task_model, updated_spiff_task) - if bpmn_process_json_data is not None: - task_service.json_data_dicts[bpmn_process_json_data["hash"]] = bpmn_process_json_data - - # spiff_task should be the main task we are completing and only it should get the timestamps - if task_model.guid == str(spiff_task.id): - task_model.start_in_seconds = start_in_seconds - task_model.end_in_seconds = end_in_seconds - - task_service.task_models[task_model.guid] = task_model - task_service.save_objects_to_database() - + task_service.update_all_tasks_from_spiff_tasks(spiff_tasks, [], start_time) ProcessInstanceTmpService.add_event_to_process_instance( self.process_instance_model, event_type, task_guid=task_id ) + self.save() # Saving the workflow seems to reset the status self.suspend() @@ -1200,18 +1159,15 @@ class ProcessInstanceProcessor: process_instance, ProcessInstanceEventType.process_instance_rewound_to_task.value, task_guid=to_task_guid ) processor = ProcessInstanceProcessor(process_instance) - initial_spiff_tasks = processor.bpmn_process_instance.get_tasks() - - processor.bpmn_process_instance.reset_from_task_id(UUID(to_task_guid)) - current_spiff_tasks = processor.bpmn_process_instance.get_tasks() - deleted_spiff_tasks = [t for t in initial_spiff_tasks if t not in current_spiff_tasks] + deleted_tasks = processor.bpmn_process_instance.reset_from_task_id(UUID(to_task_guid)) + spiff_tasks = processor.bpmn_process_instance.get_tasks() task_service = TaskService( process_instance=processor.process_instance_model, serializer=processor._serializer, bpmn_definition_to_task_definitions_mappings=processor.bpmn_definition_to_task_definitions_mappings, ) - task_service.update_all_tasks_from_spiff_tasks(current_spiff_tasks, deleted_spiff_tasks, start_time) + task_service.update_all_tasks_from_spiff_tasks(spiff_tasks, deleted_tasks, start_time) # Save the process processor.save() @@ -1611,7 +1567,9 @@ class ProcessInstanceProcessor: """Complete_task.""" task_model = TaskModel.query.filter_by(guid=human_task.task_id).first() if task_model is None: - raise TaskNotFoundError(f"Cannot find a task with guid {human_task.task_id}") + raise TaskNotFoundError( + f"Cannot find a task with guid {self.process_instance_model.id} and task_id is {human_task.task_id}" + ) task_model.start_in_seconds = time.time() self.bpmn_process_instance.run_task_from_id(spiff_task.id) @@ -1719,22 +1677,19 @@ class ProcessInstanceProcessor: def terminate(self) -> None: start_time = time.time() - initial_spiff_tasks = self.bpmn_process_instance.get_tasks() - - self.bpmn_process_instance.cancel() - current_spiff_tasks = self.bpmn_process_instance.get_tasks() - deleted_spiff_tasks = [t for t in initial_spiff_tasks if t not in current_spiff_tasks] + deleted_tasks = self.bpmn_process_instance.cancel() or [] + spiff_tasks = self.bpmn_process_instance.get_tasks() task_service = TaskService( process_instance=self.process_instance_model, serializer=self._serializer, bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, ) - task_service.update_all_tasks_from_spiff_tasks(current_spiff_tasks, deleted_spiff_tasks, start_time) + task_service.update_all_tasks_from_spiff_tasks(spiff_tasks, deleted_tasks, start_time) # we may want to move this to task_service.update_all_tasks_from_spiff_tasks but not sure it's always good to it. # for cancelled tasks, spiff only returns tasks that were cancelled, not the ones that were deleted so we have to find them - spiff_task_guids = [str(st.id) for st in current_spiff_tasks] + spiff_task_guids = [str(st.id) for st in spiff_tasks] tasks_no_longer_in_spiff = TaskModel.query.filter( and_( TaskModel.process_instance_id == self.process_instance_model.id, diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index d48698b72..dc0415845 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -339,6 +339,17 @@ class OneAtATimeExecutionStrategy(ExecutionStrategy): self.delegate.did_complete_task(spiff_task) self.delegate.after_engine_steps(bpmn_process_instance) +class SkipOneExecutionStrategy(ExecutionStrategy): + """When you want to to skip over the next task, rather than execute it.""" + + def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None: + engine_steps = self.get_ready_engine_steps(bpmn_process_instance) + if len(engine_steps) > 0: + spiff_task = engine_steps[0] + self.delegate.will_complete_task(spiff_task) + spiff_task.complete() + self.delegate.did_complete_task(spiff_task) + self.delegate.after_engine_steps(bpmn_process_instance) def execution_strategy_named( name: str, delegate: EngineStepDelegate, spec_loader: SubprocessSpecLoader @@ -348,6 +359,7 @@ def execution_strategy_named( "run_until_service_task": RunUntilServiceTaskExecutionStrategy, "run_until_user_message": RunUntilUserTaskOrMessageExecutionStrategy, "one_at_a_time": OneAtATimeExecutionStrategy, + "skip_one": SkipOneExecutionStrategy, }[name] return cls(delegate, spec_loader) # type: ignore diff --git a/spiffworkflow-backend/tests/data/step_through_gateway/step_through_gateway.bpmn b/spiffworkflow-backend/tests/data/step_through_gateway/step_through_gateway.bpmn new file mode 100644 index 000000000..a1a9641ff --- /dev/null +++ b/spiffworkflow-backend/tests/data/step_through_gateway/step_through_gateway.bpmn @@ -0,0 +1,111 @@ + + + + + Flow_1eng1ar + + + + Flow_0qv971t + Flow_085anct + Flow_18ufaed + + + + x != 1 + + + x == 1 + + + Flow_1lm2vap + Flow_1wo0x2b + Flow_0rh1j2k + + + + + Flow_0rh1j2k + + + + + x = 1 + + Flow_1eng1ar + Flow_0qv971t + + + Flow_085anct + Flow_1lm2vap + + + Flow_18ufaed + Flow_1wo0x2b + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index aca824ce9..307b63a95 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -485,6 +485,39 @@ class TestProcessInstanceProcessor(BaseTest): "stuck waiting for the call activity to complete (which was happening in a bug I'm fixing right now)" ) + def test_step_through_gateway( + self, + app: Flask, + client: FlaskClient, + with_db_and_bpmn_file_cleanup: None, + with_super_admin_user: UserModel, + ) -> None: + self.create_process_group_with_api(client, with_super_admin_user, "test_group", "test_group") + process_model = load_test_spec( + process_model_id="test_group/step_through_gateway", + process_model_source_directory="step_through_gateway", + ) + process_instance = self.create_process_instance_from_process_model( + process_model=process_model, user=with_super_admin_user + ) + processor = ProcessInstanceProcessor(process_instance) + processor.do_engine_steps(save=True) + assert len(process_instance.active_human_tasks) == 1 + human_task_one = process_instance.active_human_tasks[0] + spiff_manual_task = processor.bpmn_process_instance.get_task_from_id(UUID(human_task_one.task_id)) + processor.manual_complete_task(str(human_task_one.task_id), execute=True) + processor.save() + processor = ProcessInstanceProcessor(process_instance) + assert processor.get_task_by_bpmn_identifier('step_1', processor.bpmn_process_instance).state == TaskState.COMPLETED + assert processor.get_task_by_bpmn_identifier('Gateway_Open', processor.bpmn_process_instance).state == TaskState.READY + + gateway_task = processor.bpmn_process_instance.get_tasks(TaskState.READY)[0] + processor.manual_complete_task(str(gateway_task.id), execute=True) + processor.save() + processor = ProcessInstanceProcessor(process_instance) + assert processor.get_task_by_bpmn_identifier('Gateway_Open', processor.bpmn_process_instance).state == TaskState.COMPLETED + print(processor) + def test_properly_saves_tasks_when_running( self, app: Flask,