Merge pull request #213 from sartography/feature/recursive_run_to_greedy_strategy

moved the recursive run task method to the greedy strategy so not all…
This commit is contained in:
jasquat 2023-04-14 14:24:22 -04:00 committed by GitHub
commit 3e0af7ac3f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 36 deletions

View File

@ -1642,7 +1642,7 @@ class ProcessInstanceProcessor:
self.save,
)
try:
execution_service.run_until_user_input_required_and_save(exit_at, save)
execution_service.run(exit_at, save)
finally:
# clear out failling spiff tasks here since the ProcessInstanceProcessor creates an instance of the
# script engine on a class variable.

View File

@ -212,12 +212,27 @@ class GreedyExecutionStrategy(ExecutionStrategy):
def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
self.bpmn_process_instance = bpmn_process_instance
bpmn_process_instance.do_engine_steps(
self.run_until_user_input_required(exit_at)
self.delegate.after_engine_steps(bpmn_process_instance)
def run_until_user_input_required(self, exit_at: None = None) -> None:
"""Keeps running tasks until there are no non-human READY tasks.
spiff.refresh_waiting_tasks is the thing that pushes some waiting tasks to READY.
"""
self.bpmn_process_instance.do_engine_steps(
exit_at=exit_at,
will_complete_task=self.delegate.will_complete_task,
did_complete_task=self.delegate.did_complete_task,
)
self.delegate.after_engine_steps(bpmn_process_instance)
self.bpmn_process_instance.refresh_waiting_tasks()
ready_tasks = self.bpmn_process_instance.get_tasks(TaskState.READY)
non_human_waiting_task = next(
(p for p in ready_tasks if p.task_spec.spec_type not in ["User Task", "Manual Task"]), None
)
if non_human_waiting_task is not None:
self.run_until_user_input_required(exit_at)
class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy):
@ -288,11 +303,10 @@ class WorkflowExecutionService:
# names of methods that do spiff stuff:
# processor.do_engine_steps calls:
# run_until_user_input_required_and_save
# run_until_user_input_required
# execution_strategy.spiff_run
# spiff.do_engine_steps
def run_until_user_input_required_and_save(self, exit_at: None = None, save: bool = False) -> None:
# run
# execution_strategy.spiff_run
# spiff.[some_run_task_method]
def run(self, exit_at: None = None, save: bool = False) -> None:
"""Do_engine_steps."""
with safe_assertion(ProcessInstanceLockService.has_lock(self.process_instance_model.id)) as tripped:
if tripped:
@ -302,8 +316,16 @@ class WorkflowExecutionService:
)
try:
self.run_until_user_input_required(exit_at)
self.bpmn_process_instance.refresh_waiting_tasks()
# TODO: implicit re-entrant locks here `with_dequeued`
self.execution_strategy.spiff_run(self.bpmn_process_instance, exit_at)
if self.bpmn_process_instance.is_completed():
self.process_instance_completer(self.bpmn_process_instance)
self.process_bpmn_messages()
self.queue_waiting_receive_messages()
except SpiffWorkflowException as swe:
raise ApiError.from_workflow_exception("task_error", str(swe), swe) from swe
@ -314,31 +336,6 @@ class WorkflowExecutionService:
if save:
self.process_instance_saver()
def run_until_user_input_required(self, exit_at: None = None) -> None:
"""Keeps running tasks until there are no non-human READY tasks.
spiff.refresh_waiting_tasks is the thing that pushes some waiting tasks to READY.
"""
self.bpmn_process_instance.refresh_waiting_tasks()
# TODO: implicit re-entrant locks here `with_dequeued`
self.execution_strategy.spiff_run(self.bpmn_process_instance, exit_at)
if self.bpmn_process_instance.is_completed():
self.process_instance_completer(self.bpmn_process_instance)
self.process_bpmn_messages()
self.queue_waiting_receive_messages()
self.bpmn_process_instance.refresh_waiting_tasks()
ready_tasks = self.bpmn_process_instance.get_tasks(TaskState.READY)
non_human_waiting_task = next(
(p for p in ready_tasks if p.task_spec.spec_type not in ["User Task", "Manual Task"]), None
)
# non_human_waiting_task = next((p for p in ready_tasks), None)
if non_human_waiting_task is not None:
self.run_until_user_input_required(exit_at)
def process_bpmn_messages(self) -> None:
"""Process_bpmn_messages."""
bpmn_messages = self.bpmn_process_instance.get_bpmn_messages()
@ -410,11 +407,11 @@ class WorkflowExecutionService:
class ProfiledWorkflowExecutionService(WorkflowExecutionService):
"""A profiled version of the workflow execution service."""
def run_until_user_input_required_and_save(self, exit_at: None = None, save: bool = False) -> None:
def run(self, exit_at: None = None, save: bool = False) -> None:
"""__do_engine_steps."""
import cProfile
from pstats import SortKey
with cProfile.Profile() as pr:
super().run_until_user_input_required_and_save(exit_at=exit_at, save=save)
super().run(exit_at=exit_at, save=save)
pr.print_stats(sort=SortKey.CUMULATIVE)