Merge remote-tracking branch 'origin/main' into feature/prometheus-metrics
This commit is contained in:
commit
79a0505ddb
|
@ -1916,7 +1916,7 @@ lxml = "*"
|
|||
type = "git"
|
||||
url = "https://github.com/sartography/SpiffWorkflow"
|
||||
reference = "main"
|
||||
resolved_reference = "1c877dd768053b4cce4c4e14c92caa3216371751"
|
||||
resolved_reference = "98a1b37e01a00faea60025f517a89867b7261432"
|
||||
|
||||
[[package]]
|
||||
name = "sqlalchemy"
|
||||
|
|
|
@ -1649,7 +1649,7 @@ class ProcessInstanceProcessor:
|
|||
self.save,
|
||||
)
|
||||
try:
|
||||
execution_service.do_engine_steps(exit_at, save)
|
||||
execution_service.run_until_user_input_required_and_save(exit_at, save)
|
||||
finally:
|
||||
# clear out failling spiff tasks here since the ProcessInstanceProcessor creates an instance of the
|
||||
# script engine on a class variable.
|
||||
|
@ -1839,7 +1839,7 @@ class ProcessInstanceProcessor:
|
|||
Return either the most recent task data or--if the process instance is complete--
|
||||
the process data.
|
||||
"""
|
||||
if self.process_instance_model.status == "complete":
|
||||
if self.process_instance_model.status == ProcessInstanceStatus.complete.value:
|
||||
return self.get_data()
|
||||
|
||||
most_recent_task = None
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
import copy
|
||||
import time
|
||||
from typing import Callable
|
||||
from typing import Optional
|
||||
|
@ -138,9 +139,16 @@ class TaskModelSavingDelegate(EngineStepDelegate):
|
|||
| TaskState.LIKELY
|
||||
| TaskState.FUTURE
|
||||
):
|
||||
# these will be removed from the parent and then ignored
|
||||
if waiting_spiff_task._has_state(TaskState.PREDICTED_MASK):
|
||||
TaskService.remove_spiff_task_from_parent(waiting_spiff_task, self.task_service.task_models)
|
||||
continue
|
||||
|
||||
# removing elements from an array causes the loop to exit so deep copy the array first
|
||||
waiting_children = copy.copy(waiting_spiff_task.children)
|
||||
for waiting_child in waiting_children:
|
||||
if waiting_child._has_state(TaskState.PREDICTED_MASK):
|
||||
waiting_spiff_task.children.remove(waiting_child)
|
||||
|
||||
self.task_service.update_task_model_with_spiff_task(waiting_spiff_task)
|
||||
|
||||
# # NOTE: process-spiff-tasks-list: this would be the ideal way to handle all tasks
|
||||
|
@ -192,7 +200,7 @@ class ExecutionStrategy:
|
|||
"""__init__."""
|
||||
self.delegate = delegate
|
||||
|
||||
def do_engine_steps(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
|
||||
def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
|
||||
pass
|
||||
|
||||
def save(self, bpmn_process_instance: BpmnWorkflow) -> None:
|
||||
|
@ -202,7 +210,7 @@ class ExecutionStrategy:
|
|||
class GreedyExecutionStrategy(ExecutionStrategy):
|
||||
"""The common execution strategy. This will greedily run all engine steps without stopping."""
|
||||
|
||||
def do_engine_steps(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
|
||||
def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
|
||||
self.bpmn_process_instance = bpmn_process_instance
|
||||
bpmn_process_instance.do_engine_steps(
|
||||
exit_at=exit_at,
|
||||
|
@ -219,7 +227,7 @@ class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy):
|
|||
return (to an interstitial page). The background processor would then take over.
|
||||
"""
|
||||
|
||||
def do_engine_steps(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
|
||||
def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
|
||||
self.bpmn_process_instance = bpmn_process_instance
|
||||
engine_steps = list(
|
||||
[
|
||||
|
@ -278,7 +286,13 @@ class WorkflowExecutionService:
|
|||
self.process_instance_completer = process_instance_completer
|
||||
self.process_instance_saver = process_instance_saver
|
||||
|
||||
def do_engine_steps(self, exit_at: None = None, save: bool = False) -> None:
|
||||
# names of methods that do spiff stuff:
|
||||
# processor.do_engine_steps calls:
|
||||
# run_until_user_input_required_and_save
|
||||
# run_until_user_input_required
|
||||
# execution_strategy.spiff_run
|
||||
# spiff.do_engine_steps
|
||||
def run_until_user_input_required_and_save(self, exit_at: None = None, save: bool = False) -> None:
|
||||
"""Do_engine_steps."""
|
||||
with safe_assertion(ProcessInstanceLockService.has_lock(self.process_instance_model.id)) as tripped:
|
||||
if tripped:
|
||||
|
@ -288,16 +302,8 @@ class WorkflowExecutionService:
|
|||
)
|
||||
|
||||
try:
|
||||
self.bpmn_process_instance.refresh_waiting_tasks()
|
||||
self.run_until_user_input_required(exit_at)
|
||||
|
||||
# TODO: implicit re-entrant locks here `with_dequeued`
|
||||
self.execution_strategy.do_engine_steps(self.bpmn_process_instance, exit_at)
|
||||
|
||||
if self.bpmn_process_instance.is_completed():
|
||||
self.process_instance_completer(self.bpmn_process_instance)
|
||||
|
||||
self.process_bpmn_messages()
|
||||
self.queue_waiting_receive_messages()
|
||||
except SpiffWorkflowException as swe:
|
||||
raise ApiError.from_workflow_exception("task_error", str(swe), swe) from swe
|
||||
|
||||
|
@ -308,6 +314,31 @@ class WorkflowExecutionService:
|
|||
if save:
|
||||
self.process_instance_saver()
|
||||
|
||||
def run_until_user_input_required(self, exit_at: None = None) -> None:
|
||||
"""Keeps running tasks until there are no non-human READY tasks.
|
||||
|
||||
spiff.refresh_waiting_tasks is the thing that pushes some waiting tasks to READY.
|
||||
"""
|
||||
self.bpmn_process_instance.refresh_waiting_tasks()
|
||||
|
||||
# TODO: implicit re-entrant locks here `with_dequeued`
|
||||
self.execution_strategy.spiff_run(self.bpmn_process_instance, exit_at)
|
||||
|
||||
if self.bpmn_process_instance.is_completed():
|
||||
self.process_instance_completer(self.bpmn_process_instance)
|
||||
|
||||
self.process_bpmn_messages()
|
||||
self.queue_waiting_receive_messages()
|
||||
|
||||
self.bpmn_process_instance.refresh_waiting_tasks()
|
||||
ready_tasks = self.bpmn_process_instance.get_tasks(TaskState.READY)
|
||||
non_human_waiting_task = next(
|
||||
(p for p in ready_tasks if p.task_spec.spec_type not in ["User Task", "Manual Task"]), None
|
||||
)
|
||||
# non_human_waiting_task = next((p for p in ready_tasks), None)
|
||||
if non_human_waiting_task is not None:
|
||||
self.run_until_user_input_required(exit_at)
|
||||
|
||||
def process_bpmn_messages(self) -> None:
|
||||
"""Process_bpmn_messages."""
|
||||
bpmn_messages = self.bpmn_process_instance.get_bpmn_messages()
|
||||
|
@ -379,11 +410,11 @@ class WorkflowExecutionService:
|
|||
class ProfiledWorkflowExecutionService(WorkflowExecutionService):
|
||||
"""A profiled version of the workflow execution service."""
|
||||
|
||||
def do_engine_steps(self, exit_at: None = None, save: bool = False) -> None:
|
||||
def run_until_user_input_required_and_save(self, exit_at: None = None, save: bool = False) -> None:
|
||||
"""__do_engine_steps."""
|
||||
import cProfile
|
||||
from pstats import SortKey
|
||||
|
||||
with cProfile.Profile() as pr:
|
||||
super().do_engine_steps(exit_at=exit_at, save=save)
|
||||
super().run_until_user_input_required_and_save(exit_at=exit_at, save=save)
|
||||
pr.print_stats(sort=SortKey.CUMULATIVE)
|
||||
|
|
Loading…
Reference in New Issue