diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 9e8c6a83..2bae1059 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1569,6 +1569,7 @@ class ProcessInstanceProcessor: exit_at, save, should_schedule_waiting_timer_events=should_schedule_waiting_timer_events, + # profile=True, ) self.task_model_mapping, self.bpmn_subprocess_mapping = task_model_delegate.get_guid_to_db_object_mappings() self.check_all_tasks() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index daf0f369..4bd1e102 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -21,7 +21,8 @@ from SpiffWorkflow.bpmn.specs.mixins.events.event_types import CatchingEvent # from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore from SpiffWorkflow.exceptions import SpiffWorkflowException # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore -from SpiffWorkflow.util.task import TaskState # type: ignore +from SpiffWorkflow.util.task import TaskFilter # type: ignore +from SpiffWorkflow.util.task import TaskState from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import ( queue_future_task_if_appropriate, @@ -95,6 +96,10 @@ class EngineStepDelegate: def on_exception(self, bpmn_process_instance: BpmnWorkflow) -> None: pass + @abstractmethod + def last_completed_spiff_task(self) -> SpiffTask | None: + pass + class ExecutionStrategy: """Interface of sorts for a concrete execution strategy.""" @@ -195,7 +200,23 @@ class ExecutionStrategy: self.delegate.add_object_to_db_session(bpmn_process_instance) def get_ready_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> list[SpiffTask]: - return [t for t in bpmn_process_instance.get_tasks(state=TaskState.READY) if not t.task_spec.manual] + task_filter = TaskFilter(state=TaskState.READY, manual=False) + + steps = list( + bpmn_process_instance.get_tasks( + first_task=self.delegate.last_completed_spiff_task(), + task_filter=task_filter, + ) + ) + + if not steps: + steps = list( + bpmn_process_instance.get_tasks( + task_filter=task_filter, + ) + ) + + return steps def _run_engine_steps_with_threads( self, engine_steps: list[SpiffTask], process_instance: ProcessInstanceModel, user: UserModel | None @@ -261,7 +282,7 @@ class TaskModelSavingDelegate(EngineStepDelegate): self.current_task_start_in_seconds: float | None = None - self.last_completed_spiff_task: SpiffTask | None = None + self._last_completed_spiff_task: SpiffTask | None = None self.spiff_tasks_to_process: set[UUID] = set() self.spiff_task_timestamps: dict[UUID, StartAndEndTimes] = {} @@ -293,7 +314,6 @@ class TaskModelSavingDelegate(EngineStepDelegate): task_model.start_in_seconds = self.current_task_start_in_seconds task_model.end_in_seconds = time.time() - self.last_completed_spiff_task = spiff_task if ( spiff_task.task_spec.__class__.__name__ in ["StartEvent", "EndEvent", "IntermediateThrowEvent"] and spiff_task.task_spec.bpmn_name is not None @@ -306,6 +326,7 @@ class TaskModelSavingDelegate(EngineStepDelegate): elif spiff_task.task_spec.__class__.__name__ == "StartEvent": self.process_instance.last_milestone_bpmn_name = "Started" self.process_instance.task_updated_at_in_seconds = round(time.time()) + self._last_completed_spiff_task = spiff_task if self.secondary_engine_step_delegate: self.secondary_engine_step_delegate.did_complete_task(spiff_task) @@ -347,6 +368,9 @@ class TaskModelSavingDelegate(EngineStepDelegate): """No reason to save task model stuff if the process instance isn't persistent.""" return self.process_instance.persistence_level != "none" + def last_completed_spiff_task(self) -> SpiffTask | None: + return self._last_completed_spiff_task + class GreedyExecutionStrategy(ExecutionStrategy): """ @@ -471,6 +495,25 @@ class WorkflowExecutionService: exit_at: None = None, save: bool = False, should_schedule_waiting_timer_events: bool = True, + profile: bool = False, + ) -> TaskRunnability: + if profile: + import cProfile + from pstats import SortKey + + task_runnability = TaskRunnability.unknown_if_ready_tasks + with cProfile.Profile() as pr: + task_runnability = self._run_and_save(exit_at, save, should_schedule_waiting_timer_events) + pr.print_stats(sort=SortKey.CUMULATIVE) + return task_runnability + + return self._run_and_save(exit_at, save, should_schedule_waiting_timer_events) + + def _run_and_save( + self, + exit_at: None = None, + save: bool = False, + should_schedule_waiting_timer_events: bool = True, ) -> TaskRunnability: if self.process_instance_model.persistence_level != "none": with safe_assertion(ProcessInstanceLockService.has_lock(self.process_instance_model.id)) as tripped: @@ -610,24 +653,3 @@ class WorkflowExecutionService: bpmn_process_correlations = self.bpmn_process_instance.correlations bpmn_process.properties_json["correlations"] = bpmn_process_correlations db.session.add(bpmn_process) - - -class ProfiledWorkflowExecutionService(WorkflowExecutionService): - """A profiled version of the workflow execution service.""" - - def run_and_save( - self, - exit_at: None = None, - save: bool = False, - should_schedule_waiting_timer_events: bool = True, - ) -> TaskRunnability: - import cProfile - from pstats import SortKey - - task_runnability = TaskRunnability.unknown_if_ready_tasks - with cProfile.Profile() as pr: - task_runnability = super().run_and_save( - exit_at=exit_at, save=save, should_schedule_waiting_timer_events=should_schedule_waiting_timer_events - ) - pr.print_stats(sort=SortKey.CUMULATIVE) - return task_runnability