diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 7b04cd4c..885dda50 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1642,7 +1642,7 @@ class ProcessInstanceProcessor: self.save, ) try: - execution_service.run_until_user_input_required_and_save(exit_at, save) + execution_service.run(exit_at, save) finally: # clear out failling spiff tasks here since the ProcessInstanceProcessor creates an instance of the # script engine on a class variable. diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 4f3fba38..5764cc89 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -212,12 +212,27 @@ class GreedyExecutionStrategy(ExecutionStrategy): def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None: self.bpmn_process_instance = bpmn_process_instance - bpmn_process_instance.do_engine_steps( + self.run_until_user_input_required(exit_at) + self.delegate.after_engine_steps(bpmn_process_instance) + + def run_until_user_input_required(self, exit_at: None = None) -> None: + """Keeps running tasks until there are no non-human READY tasks. + + spiff.refresh_waiting_tasks is the thing that pushes some waiting tasks to READY. + """ + self.bpmn_process_instance.do_engine_steps( exit_at=exit_at, will_complete_task=self.delegate.will_complete_task, did_complete_task=self.delegate.did_complete_task, ) - self.delegate.after_engine_steps(bpmn_process_instance) + + self.bpmn_process_instance.refresh_waiting_tasks() + ready_tasks = self.bpmn_process_instance.get_tasks(TaskState.READY) + non_human_waiting_task = next( + (p for p in ready_tasks if p.task_spec.spec_type not in ["User Task", "Manual Task"]), None + ) + if non_human_waiting_task is not None: + self.run_until_user_input_required(exit_at) class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy): @@ -288,11 +303,10 @@ class WorkflowExecutionService: # names of methods that do spiff stuff: # processor.do_engine_steps calls: - # run_until_user_input_required_and_save - # run_until_user_input_required - # execution_strategy.spiff_run - # spiff.do_engine_steps - def run_until_user_input_required_and_save(self, exit_at: None = None, save: bool = False) -> None: + # run + # execution_strategy.spiff_run + # spiff.[some_run_task_method] + def run(self, exit_at: None = None, save: bool = False) -> None: """Do_engine_steps.""" with safe_assertion(ProcessInstanceLockService.has_lock(self.process_instance_model.id)) as tripped: if tripped: @@ -302,8 +316,16 @@ class WorkflowExecutionService: ) try: - self.run_until_user_input_required(exit_at) + self.bpmn_process_instance.refresh_waiting_tasks() + # TODO: implicit re-entrant locks here `with_dequeued` + self.execution_strategy.spiff_run(self.bpmn_process_instance, exit_at) + + if self.bpmn_process_instance.is_completed(): + self.process_instance_completer(self.bpmn_process_instance) + + self.process_bpmn_messages() + self.queue_waiting_receive_messages() except SpiffWorkflowException as swe: raise ApiError.from_workflow_exception("task_error", str(swe), swe) from swe @@ -314,31 +336,6 @@ class WorkflowExecutionService: if save: self.process_instance_saver() - def run_until_user_input_required(self, exit_at: None = None) -> None: - """Keeps running tasks until there are no non-human READY tasks. - - spiff.refresh_waiting_tasks is the thing that pushes some waiting tasks to READY. - """ - self.bpmn_process_instance.refresh_waiting_tasks() - - # TODO: implicit re-entrant locks here `with_dequeued` - self.execution_strategy.spiff_run(self.bpmn_process_instance, exit_at) - - if self.bpmn_process_instance.is_completed(): - self.process_instance_completer(self.bpmn_process_instance) - - self.process_bpmn_messages() - self.queue_waiting_receive_messages() - - self.bpmn_process_instance.refresh_waiting_tasks() - ready_tasks = self.bpmn_process_instance.get_tasks(TaskState.READY) - non_human_waiting_task = next( - (p for p in ready_tasks if p.task_spec.spec_type not in ["User Task", "Manual Task"]), None - ) - # non_human_waiting_task = next((p for p in ready_tasks), None) - if non_human_waiting_task is not None: - self.run_until_user_input_required(exit_at) - def process_bpmn_messages(self) -> None: """Process_bpmn_messages.""" bpmn_messages = self.bpmn_process_instance.get_bpmn_messages() @@ -410,11 +407,11 @@ class WorkflowExecutionService: class ProfiledWorkflowExecutionService(WorkflowExecutionService): """A profiled version of the workflow execution service.""" - def run_until_user_input_required_and_save(self, exit_at: None = None, save: bool = False) -> None: + def run(self, exit_at: None = None, save: bool = False) -> None: """__do_engine_steps.""" import cProfile from pstats import SortKey with cProfile.Profile() as pr: - super().run_until_user_input_required_and_save(exit_at=exit_at, save=save) + super().run(exit_at=exit_at, save=save) pr.print_stats(sort=SortKey.CUMULATIVE)