moved the recursive run task method to the greedy strategy so not all strategies have to do it

This commit is contained in:
jasquat 2023-04-14 14:13:42 -04:00
parent a1f68970c6
commit f53ba6b9d4
2 changed files with 33 additions and 36 deletions

View File

@ -1642,7 +1642,7 @@ class ProcessInstanceProcessor:
self.save, self.save,
) )
try: try:
execution_service.run_until_user_input_required_and_save(exit_at, save) execution_service.run(exit_at, save)
finally: finally:
# clear out failling spiff tasks here since the ProcessInstanceProcessor creates an instance of the # clear out failling spiff tasks here since the ProcessInstanceProcessor creates an instance of the
# script engine on a class variable. # script engine on a class variable.

View File

@ -212,12 +212,27 @@ class GreedyExecutionStrategy(ExecutionStrategy):
def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None: def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
self.bpmn_process_instance = bpmn_process_instance self.bpmn_process_instance = bpmn_process_instance
bpmn_process_instance.do_engine_steps( self.run_until_user_input_required(exit_at)
self.delegate.after_engine_steps(bpmn_process_instance)
def run_until_user_input_required(self, exit_at: None = None) -> None:
"""Keeps running tasks until there are no non-human READY tasks.
spiff.refresh_waiting_tasks is the thing that pushes some waiting tasks to READY.
"""
self.bpmn_process_instance.do_engine_steps(
exit_at=exit_at, exit_at=exit_at,
will_complete_task=self.delegate.will_complete_task, will_complete_task=self.delegate.will_complete_task,
did_complete_task=self.delegate.did_complete_task, did_complete_task=self.delegate.did_complete_task,
) )
self.delegate.after_engine_steps(bpmn_process_instance)
self.bpmn_process_instance.refresh_waiting_tasks()
ready_tasks = self.bpmn_process_instance.get_tasks(TaskState.READY)
non_human_waiting_task = next(
(p for p in ready_tasks if p.task_spec.spec_type not in ["User Task", "Manual Task"]), None
)
if non_human_waiting_task is not None:
self.run_until_user_input_required(exit_at)
class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy): class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy):
@ -288,11 +303,10 @@ class WorkflowExecutionService:
# names of methods that do spiff stuff: # names of methods that do spiff stuff:
# processor.do_engine_steps calls: # processor.do_engine_steps calls:
# run_until_user_input_required_and_save # run
# run_until_user_input_required
# execution_strategy.spiff_run # execution_strategy.spiff_run
# spiff.do_engine_steps # spiff.[some_run_task_method]
def run_until_user_input_required_and_save(self, exit_at: None = None, save: bool = False) -> None: def run(self, exit_at: None = None, save: bool = False) -> None:
"""Do_engine_steps.""" """Do_engine_steps."""
with safe_assertion(ProcessInstanceLockService.has_lock(self.process_instance_model.id)) as tripped: with safe_assertion(ProcessInstanceLockService.has_lock(self.process_instance_model.id)) as tripped:
if tripped: if tripped:
@ -302,23 +316,6 @@ class WorkflowExecutionService:
) )
try: try:
self.run_until_user_input_required(exit_at)
except SpiffWorkflowException as swe:
raise ApiError.from_workflow_exception("task_error", str(swe), swe) from swe
finally:
self.execution_strategy.save(self.bpmn_process_instance)
db.session.commit()
if save:
self.process_instance_saver()
def run_until_user_input_required(self, exit_at: None = None) -> None:
"""Keeps running tasks until there are no non-human READY tasks.
spiff.refresh_waiting_tasks is the thing that pushes some waiting tasks to READY.
"""
self.bpmn_process_instance.refresh_waiting_tasks() self.bpmn_process_instance.refresh_waiting_tasks()
# TODO: implicit re-entrant locks here `with_dequeued` # TODO: implicit re-entrant locks here `with_dequeued`
@ -329,15 +326,15 @@ class WorkflowExecutionService:
self.process_bpmn_messages() self.process_bpmn_messages()
self.queue_waiting_receive_messages() self.queue_waiting_receive_messages()
except SpiffWorkflowException as swe:
raise ApiError.from_workflow_exception("task_error", str(swe), swe) from swe
self.bpmn_process_instance.refresh_waiting_tasks() finally:
ready_tasks = self.bpmn_process_instance.get_tasks(TaskState.READY) self.execution_strategy.save(self.bpmn_process_instance)
non_human_waiting_task = next( db.session.commit()
(p for p in ready_tasks if p.task_spec.spec_type not in ["User Task", "Manual Task"]), None
) if save:
# non_human_waiting_task = next((p for p in ready_tasks), None) self.process_instance_saver()
if non_human_waiting_task is not None:
self.run_until_user_input_required(exit_at)
def process_bpmn_messages(self) -> None: def process_bpmn_messages(self) -> None:
"""Process_bpmn_messages.""" """Process_bpmn_messages."""
@ -410,11 +407,11 @@ class WorkflowExecutionService:
class ProfiledWorkflowExecutionService(WorkflowExecutionService): class ProfiledWorkflowExecutionService(WorkflowExecutionService):
"""A profiled version of the workflow execution service.""" """A profiled version of the workflow execution service."""
def run_until_user_input_required_and_save(self, exit_at: None = None, save: bool = False) -> None: def run(self, exit_at: None = None, save: bool = False) -> None:
"""__do_engine_steps.""" """__do_engine_steps."""
import cProfile import cProfile
from pstats import SortKey from pstats import SortKey
with cProfile.Profile() as pr: with cProfile.Profile() as pr:
super().run_until_user_input_required_and_save(exit_at=exit_at, save=save) super().run(exit_at=exit_at, save=save)
pr.print_stats(sort=SortKey.CUMULATIVE) pr.print_stats(sort=SortKey.CUMULATIVE)